diff --git a/taskflow/decorators.py b/taskflow/decorators.py
index a8fad0c54..45f4098fd 100644
--- a/taskflow/decorators.py
+++ b/taskflow/decorators.py
@@ -35,14 +35,28 @@ def wraps(fn):
     return wrapper
 
 
-def locked(f):
+def locked(*args, **kwargs):
 
-    @wraps(f)
-    def wrapper(self, *args, **kwargs):
-        with self._lock:
-            return f(self, *args, **kwargs)
+    def decorator(f):
+        attr_name = kwargs.get('lock', '_lock')
 
-    return wrapper
+        @wraps(f)
+        def wrapper(*args, **kwargs):
+            lock = getattr(args[0], attr_name)
+            with lock:
+                return f(*args, **kwargs)
+
+        return wrapper
+
+    # This is needed to handle when the decorator has args or the decorator
+    # doesn't have args, python is rather weird here...
+    if kwargs or not args:
+        return decorator
+    else:
+        if len(args) == 1:
+            return decorator(args[0])
+        else:
+            return decorator
 
 
 def _original_function(fun):
diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py
index cc7362c74..7f572ff32 100644
--- a/taskflow/exceptions.py
+++ b/taskflow/exceptions.py
@@ -68,10 +68,10 @@ class JobNotFound(TaskFlowException):
 
 
 class MissingDependencies(InvalidStateException):
-    """Raised when a task has dependencies that can not be satisified."""
-    message = ("%(task)s requires %(requirements)s but no other task produces"
+    """Raised when a entity has dependencies that can not be satisified."""
+    message = ("%(who)s requires %(requirements)s but no other entity produces"
                " said requirements")
 
-    def __init__(self, task, requirements):
-        message = self.message % {'task': task, 'requirements': requirements}
+    def __init__(self, who, requirements):
+        message = self.message % {'who': who, 'requirements': requirements}
         super(MissingDependencies, self).__init__(message)
diff --git a/taskflow/flow.py b/taskflow/flow.py
index 4319913a6..8d03e48af 100644
--- a/taskflow/flow.py
+++ b/taskflow/flow.py
@@ -21,7 +21,6 @@ import threading
 
 from taskflow.openstack.common import uuidutils
 
-from taskflow import decorators
 from taskflow import exceptions as exc
 from taskflow import states
 from taskflow import utils
@@ -86,14 +85,14 @@ class Flow(object):
         # storage backend).
         self.notifier = utils.TransitionNotifier()
         self.task_notifier = utils.TransitionNotifier()
-        # Ensure that modifications and/or multiple runs aren't happening
-        # at the same time in the same flow at the same time.
-        self._lock = threading.RLock()
         # Assign this flow a unique identifer.
         if uuid:
             self._id = str(uuid)
         else:
             self._id = uuidutils.generate_uuid()
+        # Ensure we can not change the state at the same time in 2 different
+        # threads.
+        self._state_lock = threading.RLock()
 
     @property
     def name(self):
@@ -109,21 +108,26 @@ class Flow(object):
         """Provides a read-only view of the flow state."""
         return self._state
 
-    def _change_state(self, context, new_state):
-        was_changed = False
-        old_state = self.state
-        with self._lock:
+    def _change_state(self, context, new_state, check_func=None, notify=True):
+        old_state = None
+        changed = False
+        with self._state_lock:
             if self.state != new_state:
-                old_state = self.state
-                self._state = new_state
-                was_changed = True
-        if was_changed:
-            # Don't notify while holding the lock.
+                if (not check_func or
+                   (check_func and check_func(self.state))):
+                    changed = True
+                    old_state = self.state
+                    self._state = new_state
+        # Don't notify while holding the lock so that the reciever of said
+        # notifications can actually perform operations on the given flow
+        # without getting into deadlock.
+        if notify and changed:
             self.notifier.notify(self.state, details={
                 'context': context,
                 'flow': self,
                 'old_state': old_state,
             })
+        return changed
 
     def __str__(self):
         lines = ["Flow: %s" % (self.name)]
@@ -141,7 +145,6 @@ class Flow(object):
         """
         raise NotImplementedError()
 
-    @decorators.locked
     def add_many(self, tasks):
         """Adds many tasks to this flow.
 
@@ -158,54 +161,54 @@ class Flow(object):
 
         Returns how many tasks were interrupted (if any).
         """
-        if self.state in self.UNINTERRUPTIBLE_STATES:
-            raise exc.InvalidStateException(("Can not interrupt when"
-                                             " in state %s") % (self.state))
-        # Note(harlowja): Do *not* acquire the lock here so that the flow may
-        # be interrupted while running. This does mean the the above check may
-        # not be valid but we can worry about that if it becomes an issue.
-        old_state = self.state
-        if old_state != states.INTERRUPTED:
-            self._state = states.INTERRUPTED
-            self.notifier.notify(self.state, details={
-                'context': None,
-                'flow': self,
-                'old_state': old_state,
-            })
-        return 0
+        def check():
+            if self.state in self.UNINTERRUPTIBLE_STATES:
+                raise exc.InvalidStateException(("Can not interrupt when"
+                                                 " in state %s") % self.state)
+
+        check()
+        with self._state_lock:
+            check()
+            self._change_state(None, states.INTERRUPTED)
+            return 0
 
-    @decorators.locked
     def reset(self):
         """Fully resets the internal state of this flow, allowing for the flow
         to be ran again.
 
         Note: Listeners are also reset.
         """
-        if self.state not in self.RESETTABLE_STATES:
-            raise exc.InvalidStateException(("Can not reset when"
-                                             " in state %s") % (self.state))
-        self.notifier.reset()
-        self.task_notifier.reset()
-        self._change_state(None, states.PENDING)
+        def check():
+            if self.state not in self.RESETTABLE_STATES:
+                raise exc.InvalidStateException(("Can not reset when"
+                                                 " in state %s") % self.state)
+
+        check()
+        with self._state_lock:
+            check()
+            self.notifier.reset()
+            self.task_notifier.reset()
+            self._change_state(None, states.PENDING)
 
-    @decorators.locked
     def soft_reset(self):
         """Partially resets the internal state of this flow, allowing for the
-        flow to be ran again from an interrupted state only.
+        flow to be ran again from an interrupted state.
         """
-        if self.state not in self.SOFT_RESETTABLE_STATES:
-            raise exc.InvalidStateException(("Can not soft reset when"
-                                             " in state %s") % (self.state))
-        self._change_state(None, states.PENDING)
+        def check():
+            if self.state not in self.SOFT_RESETTABLE_STATES:
+                raise exc.InvalidStateException(("Can not soft reset when"
+                                                 " in state %s") % self.state)
 
-    @decorators.locked
+        check()
+        with self._state_lock:
+            check()
+            self._change_state(None, states.PENDING)
+
+    @abc.abstractmethod
     def run(self, context, *args, **kwargs):
         """Executes the workflow."""
-        if self.state not in self.RUNNABLE_STATES:
-            raise exc.InvalidStateException("Unable to run flow when "
-                                            "in state %s" % (self.state))
+        raise NotImplementedError()
 
-    @decorators.locked
     def rollback(self, context, cause):
         """Performs rollback of this workflow and any attached parent workflows
         if present.
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 320c34771..e1cf04775 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -49,7 +49,7 @@ class Flow(linear_flow.Flow):
         # to infer the edges at this stage we likely will fail finding
         # dependencies from nodes that don't exist.
         assert isinstance(task, collections.Callable)
-        r = utils.Runner(task)
+        r = utils.AOTRunner(task)
         self._graph.add_node(r, uuid=r.uuid, infer=infer)
         self._reset_internals()
         return r.uuid
diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py
index a2da98c09..7ae17c549 100644
--- a/taskflow/patterns/linear_flow.py
+++ b/taskflow/patterns/linear_flow.py
@@ -17,7 +17,9 @@
 #    under the License.
 
 import collections
+import functools
 import logging
+import threading
 
 from taskflow.openstack.common import excutils
 
@@ -54,6 +56,7 @@ class Flow(flow.Flow):
         # All runners to run are collected here.
         self._runners = []
         self._connected = False
+        self._lock = threading.RLock()
         # The resumption strategy to use.
         self.resumer = None
 
@@ -61,7 +64,7 @@ class Flow(flow.Flow):
     def add(self, task):
         """Adds a given task to this flow."""
         assert isinstance(task, collections.Callable)
-        r = utils.Runner(task)
+        r = utils.AOTRunner(task)
         r.runs_before = list(reversed(self._runners))
         self._runners.append(r)
         self._reset_internals()
@@ -136,20 +139,27 @@ class Flow(flow.Flow):
 
     @decorators.locked
     def run(self, context, *args, **kwargs):
-        super(Flow, self).run(context, *args, **kwargs)
+
+        def abort_if(current_state, ok_states):
+            if current_state not in ok_states:
+                return False
+            return True
 
         def resume_it():
             if self._leftoff_at is not None:
                 return ([], self._leftoff_at)
             if self.resumer:
-                (finished, leftover) = self.resumer.resume(self,
-                                                           self._ordering())
+                (finished, leftover) = self.resumer(self, self._ordering())
             else:
                 finished = []
                 leftover = self._ordering()
             return (finished, leftover)
 
-        self._change_state(context, states.STARTED)
+        start_check_functor = functools.partial(abort_if,
+                                                ok_states=self.RUNNABLE_STATES)
+        if not self._change_state(context, states.STARTED,
+                                  check_func=start_check_functor):
+            return
         try:
             those_finished, leftover = resume_it()
         except Exception:
@@ -211,8 +221,13 @@ class Flow(flow.Flow):
                     })
                     self.rollback(context, cause)
 
+        run_check_functor = functools.partial(abort_if,
+                                              ok_states=[states.STARTED,
+                                                         states.RESUMING])
         if len(those_finished):
-            self._change_state(context, states.RESUMING)
+            if not self._change_state(context, states.RESUMING,
+                                      check_func=run_check_functor):
+                return
             for (r, details) in those_finished:
                 # Fake running the task so that we trigger the same
                 # notifications and state changes (and rollback that
@@ -222,8 +237,8 @@ class Flow(flow.Flow):
                 run_it(r, failed=failed, result=result, simulate_run=True)
 
         self._leftoff_at = leftover
-        self._change_state(context, states.RUNNING)
-        if self.state == states.INTERRUPTED:
+        if not self._change_state(context, states.RUNNING,
+                                  check_func=run_check_functor):
             return
 
         was_interrupted = False
diff --git a/taskflow/patterns/resumption/logbook.py b/taskflow/patterns/resumption/logbook.py
index 687e16679..07dd9535a 100644
--- a/taskflow/patterns/resumption/logbook.py
+++ b/taskflow/patterns/resumption/logbook.py
@@ -111,7 +111,7 @@ class Resumption(object):
                 details = self._reconcile_versions(immediate_version, details)
         return (True, details)
 
-    def resume(self, flow, ordering):
+    def __call__(self, flow, ordering):
         """Splits the initial ordering into two segments, the first which
         has already completed (or errored) and the second which has not
         completed or errored.
diff --git a/taskflow/patterns/threaded_flow.py b/taskflow/patterns/threaded_flow.py
new file mode 100644
index 000000000..6869b63f7
--- /dev/null
+++ b/taskflow/patterns/threaded_flow.py
@@ -0,0 +1,636 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from taskflow import exceptions as exc
+from taskflow import flow
+from taskflow import graph_utils
+from taskflow import states
+from taskflow import utils
+
+import collections
+import functools
+import logging
+import sys
+import threading
+import weakref
+
+from networkx.algorithms import cycles
+from networkx.classes import digraph
+
+LOG = logging.getLogger(__name__)
+
+
+class DependencyTimeout(exc.InvalidStateException):
+    """When running in parallel a task has the ability to timeout waiting for
+    its dependent tasks to finish, this will be raised when that occurs.
+    """
+    pass
+
+
+class Flow(flow.Flow):
+    """This flow pattern establishes tasks into a graph where each task is a
+    node in the graph and dependencies between tasks are edges in the graph.
+    When running (in parallel) each task will only be activated when its
+    dependencies have been satisified. When a graph is split into two or more
+    segments, both of those segments will be ran in parallel.
+
+    For example lets take this small little *somewhat complicated* graph:
+
+    X--Y--C--D
+          |  |
+    A--B--    --G--
+          |  |     |--Z(end)
+    E--F--    --H--
+
+    In this flow the following will be ran in parallel at start:
+     1. X--Y
+     2. A--B
+     3. E--F
+    Note the C--D nodes will not be able to run until [Y,B,F] has completed.
+    After C--D completes the following will be ran in parallel:
+     1. G
+     2. H
+    Then finally Z will run (after [G,H] complete) and the flow will then have
+    finished executing.
+    """
+    MUTABLE_STATES = set([states.PENDING, states.FAILURE, states.SUCCESS])
+    REVERTABLE_STATES = set([states.FAILURE, states.INCOMPLETE])
+    CANCELLABLE_STATES = set([states.PENDING, states.RUNNING])
+
+    def __init__(self, name):
+        super(Flow, self).__init__(name)
+        self._graph = digraph.DiGraph(name=name)
+        self._run_lock = threading.RLock()
+        self._cancel_lock = threading.RLock()
+        self._mutate_lock = threading.RLock()
+        # NOTE(harlowja) The locking order in this list actually matters since
+        # we need to make sure that users of this list do not get deadlocked
+        # by out of order lock access.
+        self._core_locks = [
+            self._run_lock,
+            self._mutate_lock,
+            self._cancel_lock,
+        ]
+        self._run_locks = [
+            self._run_lock,
+            self._mutate_lock,
+        ]
+        self._cancel_locks = [
+            self._cancel_lock,
+        ]
+        self.results = {}
+        self.resumer = None
+
+    def __str__(self):
+        lines = ["ParallelFlow: %s" % (self.name)]
+        lines.append("%s" % (self._graph.number_of_nodes()))
+        lines.append("%s" % (self.state))
+        return "; ".join(lines)
+
+    def soft_reset(self):
+        # The way this flow works does not allow (at the current moment) for
+        # you to suspend the threads and then resume them at a later time,
+        # instead it only supports interruption (which will cancel the threads)
+        # and then a full reset.
+        raise NotImplementedError("Threaded flow does not currently support"
+                                  " soft resetting, please try using"
+                                  " reset() instead")
+
+    def interrupt(self):
+        """Currently we can not pause threads and then resume them later, not
+        really thinking that we should likely ever do this.
+        """
+        raise NotImplementedError("Threaded flow does not currently support"
+                                  " interruption, please try using"
+                                  " cancel() instead")
+
+    def reset(self):
+        # All locks are used so that resets can not happen while running or
+        # cancelling or modifying.
+        with utils.MultiLock(self._core_locks):
+            super(Flow, self).reset()
+            self.results = {}
+            self.resumer = None
+
+    def cancel(self):
+
+        def check():
+            if self.state not in self.CANCELLABLE_STATES:
+                raise exc.InvalidStateException("Can not attempt cancellation"
+                                                " when in state %s" %
+                                                self.state)
+
+        check()
+        cancelled = 0
+        was_empty = False
+
+        # We don't lock the other locks so that the flow can be cancelled while
+        # running. Further state management logic is then used while running
+        # to verify that the flow should still be running when it has been
+        # cancelled.
+        with utils.MultiLock(self._cancel_locks):
+            check()
+            if len(self._graph) == 0:
+                was_empty = True
+            else:
+                for r in self._graph.nodes_iter():
+                    try:
+                        if r.cancel(blocking=False):
+                            cancelled += 1
+                    except exc.InvalidStateException:
+                        pass
+            if cancelled or was_empty:
+                self._change_state(None, states.CANCELLED)
+
+        return cancelled
+
+    def _find_uuid(self, uuid):
+        # Finds the runner for the given uuid (or returns none)
+        for r in self._graph.nodes_iter():
+            if r.uuid == uuid:
+                return r
+        return None
+
+    def add(self, task, timeout=None, infer=True):
+        """Adds a task to the given flow using the given timeout which will be
+        used a the timeout to wait for dependencies (if any) to be
+        fulfilled.
+        """
+        def check():
+            if self.state not in self.MUTABLE_STATES:
+                raise exc.InvalidStateException("Flow is currently in a"
+                                                " non-mutable %s state" %
+                                                (self.state))
+
+        # Ensure that we do a quick check to see if we can even perform this
+        # addition before we go about actually acquiring the lock to perform
+        # the actual addition.
+        check()
+
+        # All locks must be acquired so that modifications can not be made
+        # while running, cancelling or performing a simultaneous mutation.
+        with utils.MultiLock(self._core_locks):
+            check()
+            runner = ThreadRunner(task, self, timeout)
+            self._graph.add_node(runner, infer=infer)
+            return runner.uuid
+
+    def _connect(self):
+        """Infers and connects the edges of the given tasks by examining the
+        associated tasks provides and requires attributes and connecting tasks
+        that require items to tasks that produce said items.
+        """
+
+        # Disconnect all edges not manually created before we attempt to infer
+        # them so that we don't retain edges that are invalid.
+        def disconnect_non_user(u, v, e_data):
+            if e_data and e_data.get('reason') != 'manual':
+                return True
+            return False
+
+        # Link providers to requirers.
+        graph_utils.connect(self._graph,
+                            discard_func=disconnect_non_user)
+
+        # Connect the successors & predecessors and related siblings
+        for r in self._graph.nodes_iter():
+            r._predecessors = []
+            r._successors = []
+            for (r2, _me) in self._graph.in_edges_iter([r]):
+                r._predecessors.append(r2)
+            for (_me, r2) in self._graph.out_edges_iter([r]):
+                r._successors.append(r2)
+            r.siblings = []
+            for r2 in self._graph.nodes_iter():
+                if r2 is r or r2 in r._predecessors or r2 in r._successors:
+                    continue
+                r._siblings.append(r2)
+
+    def add_many(self, tasks):
+        """Adds a list of tasks to the flow."""
+
+        def check():
+            if self.state not in self.MUTABLE_STATES:
+                raise exc.InvalidStateException("Flow is currently in a"
+                                                " non-mutable state %s"
+                                                % (self.state))
+
+        # Ensure that we do a quick check to see if we can even perform this
+        # addition before we go about actually acquiring the lock.
+        check()
+
+        # All locks must be acquired so that modifications can not be made
+        # while running, cancelling or performing a simultaneous mutation.
+        with utils.MultiLock(self._core_locks):
+            check()
+            added = []
+            for t in tasks:
+                added.append(self.add(t))
+            return added
+
+    def add_dependency(self, provider_uuid, consumer_uuid):
+        """Manually adds a dependency between a provider and a consumer."""
+
+        def check_and_fetch():
+            if self.state not in self.MUTABLE_STATES:
+                raise exc.InvalidStateException("Flow is currently in a"
+                                                " non-mutable state %s"
+                                                % (self.state))
+            provider = self._find_uuid(provider_uuid)
+            if not provider or not self._graph.has_node(provider):
+                raise exc.InvalidStateException("Can not add a dependency "
+                                                "from unknown uuid %s" %
+                                                (provider_uuid))
+            consumer = self._find_uuid(consumer_uuid)
+            if not consumer or not self._graph.has_node(consumer):
+                raise exc.InvalidStateException("Can not add a dependency "
+                                                "to unknown uuid %s"
+                                                % (consumer_uuid))
+            if provider is consumer:
+                raise exc.InvalidStateException("Can not add a dependency "
+                                                "to loop via uuid %s"
+                                                % (consumer_uuid))
+            return (provider, consumer)
+
+        check_and_fetch()
+
+        # All locks must be acquired so that modifications can not be made
+        # while running, cancelling or performing a simultaneous mutation.
+        with utils.MultiLock(self._core_locks):
+            (provider, consumer) = check_and_fetch()
+            self._graph.add_edge(provider, consumer, reason='manual')
+            LOG.debug("Connecting %s as a manual provider for %s",
+                      provider, consumer)
+
+    def run(self, context, *args, **kwargs):
+        """Executes the given flow using the given context and args/kwargs."""
+
+        def abort_if(current_state, ok_states):
+            if current_state in (states.CANCELLED,):
+                return False
+            if current_state not in ok_states:
+                return False
+            return True
+
+        def check():
+            if self.state not in self.RUNNABLE_STATES:
+                raise exc.InvalidStateException("Flow is currently unable "
+                                                "to be ran in state %s"
+                                                % (self.state))
+
+        def connect_and_verify():
+            """Do basic sanity tests on the graph structure."""
+            if len(self._graph) == 0:
+                return
+            self._connect()
+            degrees = [g[1] for g in self._graph.in_degree_iter()]
+            zero_degrees = [d for d in degrees if d == 0]
+            if not zero_degrees:
+                # If every task depends on something else to produce its input
+                # then we will be in a deadlock situation.
+                raise exc.InvalidStateException("No task has an in-degree"
+                                                " of zero")
+            self_loops = self._graph.nodes_with_selfloops()
+            if self_loops:
+                # A task that has a dependency on itself will never be able
+                # to run.
+                raise exc.InvalidStateException("%s tasks have been detected"
+                                                " with dependencies on"
+                                                " themselves" %
+                                                len(self_loops))
+            simple_cycles = len(cycles.recursive_simple_cycles(self._graph))
+            if simple_cycles:
+                # A task loop will never be able to run, unless it somehow
+                # breaks that loop.
+                raise exc.InvalidStateException("%s tasks have been detected"
+                                                " with dependency loops" %
+                                                simple_cycles)
+
+        def run_it(result_cb, args, kwargs):
+            check_runnable = functools.partial(abort_if,
+                                               ok_states=self.RUNNABLE_STATES)
+            if self._change_state(context, states.RUNNING,
+                                  check_func=check_runnable):
+                self.results = {}
+                if len(self._graph) == 0:
+                    return
+                for r in self._graph.nodes_iter():
+                    r.reset()
+                    r._result_cb = result_cb
+                executor = utils.ThreadGroupExecutor()
+                for r in self._graph.nodes_iter():
+                    executor.submit(r, *args, **kwargs)
+                executor.await_termination()
+
+        def trigger_rollback(failures):
+            if not failures:
+                return
+            causes = []
+            for r in failures:
+                causes.append(utils.FlowFailure(r, self,
+                                                r.exc, r.exc_info))
+            try:
+                self.rollback(context, causes)
+            except exc.InvalidStateException:
+                pass
+            finally:
+                # TODO(harlowja): re-raise a combined exception when
+                # there are more than one failures??
+                for f in failures:
+                    if all(f.exc_info):
+                        raise f.exc_info[0], f.exc_info[1], f.exc_info[2]
+
+        def handle_results():
+            # Isolate each runner state into groups so that we can easily tell
+            # which ones failed, cancelled, completed...
+            groups = collections.defaultdict(list)
+            for r in self._graph.nodes_iter():
+                groups[r.state].append(r)
+            for r in self._graph.nodes_iter():
+                if r not in groups.get(states.FAILURE, []) and r.has_ran():
+                    self.results[r.uuid] = r.result
+            if groups[states.FAILURE]:
+                self._change_state(context, states.FAILURE)
+                trigger_rollback(groups[states.FAILURE])
+            elif (groups[states.CANCELLED] or groups[states.PENDING]
+                  or groups[states.TIMED_OUT] or groups[states.STARTED]):
+                self._change_state(context, states.INCOMPLETE)
+            else:
+                check_ran = functools.partial(abort_if,
+                                              ok_states=[states.RUNNING])
+                self._change_state(context, states.SUCCESS,
+                                   check_func=check_ran)
+
+        def get_resumer_cb():
+            if not self.resumer:
+                return None
+            (ran, _others) = self.resumer(self, self._graph.nodes_iter())
+
+            def fetch_results(runner):
+                for (r, metadata) in ran:
+                    if r is runner:
+                        return (True, metadata.get('result'))
+                return (False, None)
+
+            result_cb = fetch_results
+            return result_cb
+
+        args = [context] + list(args)
+        check()
+
+        # Only acquire the run lock (but use further state checking) and the
+        # mutation lock to stop simultaneous running and simultaneous mutating
+        # which are not allowed on a running flow. Allow simultaneous cancel
+        # by performing repeated state checking while running.
+        with utils.MultiLock(self._run_locks):
+            check()
+            connect_and_verify()
+            try:
+                run_it(get_resumer_cb(), args, kwargs)
+            finally:
+                handle_results()
+
+    def rollback(self, context, cause):
+        """Rolls back all tasks that are *not* still pending or cancelled."""
+
+        def check():
+            if self.state not in self.REVERTABLE_STATES:
+                raise exc.InvalidStateException("Flow is currently unable "
+                                                "to be rolled back in "
+                                                "state %s" % (self.state))
+
+        check()
+
+        # All locks must be acquired so that modifications can not be made
+        # while another entity is running, rolling-back, cancelling or
+        # performing a mutation operation.
+        with utils.MultiLock(self._core_locks):
+            check()
+            accum = utils.RollbackAccumulator()
+            for r in self._graph.nodes_iter():
+                if r.has_ran():
+                    accum.add(utils.RollbackTask(context, r.task, r.result))
+            try:
+                self._change_state(context, states.REVERTING)
+                accum.rollback(cause)
+            finally:
+                self._change_state(context, states.FAILURE)
+
+
+class ThreadRunner(utils.Runner):
+    """A helper class that will use a countdown latch to avoid calling its
+    callable object until said countdown latch has emptied. After it has
+    been emptied the predecessor tasks will be examined for dependent results
+    and said results will then be provided to call the runners callable
+    object.
+
+    TODO(harlowja): this could be a 'future' like object in the future since it
+    is starting to have the same purpose and usage (in a way). Likely switch
+    this over to the task details object or a subclass of it???
+    """
+    RESETTABLE_STATES = set([states.PENDING, states.SUCCESS, states.FAILURE,
+                             states.CANCELLED])
+    RUNNABLE_STATES = set([states.PENDING])
+    CANCELABLE_STATES = set([states.PENDING])
+    SUCCESS_STATES = set([states.SUCCESS])
+    CANCEL_SUCCESSORS_WHEN = set([states.FAILURE, states.CANCELLED,
+                                  states.TIMED_OUT])
+    NO_RAN_STATES = set([states.CANCELLED, states.PENDING, states.TIMED_OUT,
+                         states.RUNNING])
+
+    def __init__(self, task, flow, timeout):
+        super(ThreadRunner, self).__init__(task)
+        # Use weak references to give the GC a break.
+        self._flow = weakref.proxy(flow)
+        self._notifier = flow.task_notifier
+        self._timeout = timeout
+        self._state = states.PENDING
+        self._run_lock = threading.RLock()
+        # Use the flows state lock so that state notifications are not sent
+        # simultaneously for a given flow.
+        self._state_lock = flow._state_lock
+        self._cancel_lock = threading.RLock()
+        self._latch = utils.CountDownLatch()
+        # Any related family.
+        self._predecessors = []
+        self._successors = []
+        self._siblings = []
+        # Ensure we capture any exceptions that may have been triggered.
+        self.exc = None
+        self.exc_info = (None, None, None)
+        # This callback will be called before the underlying task is actually
+        # returned and it should either return a tuple of (has_result, result)
+        self._result_cb = None
+
+    @property
+    def state(self):
+        return self._state
+
+    def has_ran(self):
+        if self.state in self.NO_RAN_STATES:
+            return False
+        return True
+
+    def _change_state(self, context, new_state):
+        old_state = None
+        changed = False
+        with self._state_lock:
+            if self.state != new_state:
+                old_state = self.state
+                self._state = new_state
+                changed = True
+        # Don't notify while holding the lock so that the reciever of said
+        # notifications can actually perform operations on the given runner
+        # without getting into deadlock.
+        if changed and self._notifier:
+            self._notifier.notify(self.state, details={
+                'context': context,
+                'flow': self._flow,
+                'old_state': old_state,
+                'runner': self,
+            })
+
+    def cancel(self, blocking=True):
+
+        def check():
+            if self.state not in self.CANCELABLE_STATES:
+                raise exc.InvalidStateException("Runner not in a cancelable"
+                                                " state: %s" % (self.state))
+
+        # Check before as a quick way out of attempting to acquire the more
+        # heavy-weight lock. Then acquire the lock (which should not be
+        # possible if we are currently running) and set the state (if still
+        # applicable).
+        check()
+        acquired = False
+        cancelled = False
+        try:
+            acquired = self._cancel_lock.acquire(blocking=blocking)
+            if acquired:
+                check()
+                cancelled = True
+                self._change_state(None, states.CANCELLED)
+        finally:
+            if acquired:
+                self._cancel_lock.release()
+        return cancelled
+
+    def reset(self):
+
+        def check():
+            if self.state not in self.RESETTABLE_STATES:
+                raise exc.InvalidStateException("Runner not in a resettable"
+                                                " state: %s" % (self.state))
+
+        def do_reset():
+            self._latch.count = len(self._predecessors)
+            self.exc = None
+            self.exc_info = (None, None, None)
+            self.result = None
+            self._change_state(None, states.PENDING)
+
+        # We need to acquire both locks here so that we can not be running
+        # or being cancelled at the same time we are resetting.
+        check()
+        with self._run_lock:
+            check()
+            with self._cancel_lock:
+                check()
+                do_reset()
+
+    @property
+    def runs_before(self):
+        # NOTE(harlowja): this list may change, depending on which other
+        # runners have completed (or are currently actively running), so
+        # this is why this is a property instead of a semi-static defined list
+        # like in the AOT class. The list should only get bigger and not
+        # smaller so it should be fine to filter on runners that have completed
+        # successfully.
+        finished_ok = []
+        for r in self._siblings:
+            if r.has_ran() and r.state in self.SUCCESS_STATES:
+                finished_ok.append(r)
+        return finished_ok
+
+    def __call__(self, context, *args, **kwargs):
+
+        def is_runnable():
+            if self.state not in self.RUNNABLE_STATES:
+                return False
+            return True
+
+        def run(*args, **kwargs):
+            try:
+                self._change_state(context, states.RUNNING)
+                has_result = False
+                if self._result_cb:
+                    has_result, self.result = self._result_cb(self)
+                if not has_result:
+                    super(ThreadRunner, self).__call__(*args, **kwargs)
+                self._change_state(context, states.SUCCESS)
+            except Exception as e:
+                self._change_state(context, states.FAILURE)
+                self.exc = e
+                self.exc_info = sys.exc_info()
+
+        def signal():
+            if not self._successors:
+                return
+            if self.state in self.CANCEL_SUCCESSORS_WHEN:
+                for r in self._successors:
+                    try:
+                        r.cancel(blocking=False)
+                    except exc.InvalidStateException:
+                        pass
+            for r in self._successors:
+                try:
+                    r._latch.countDown()
+                except Exception:
+                    LOG.exception("Failed decrementing %s latch", r)
+
+        # We check before to avoid attempting to acquire the lock when we are
+        # known to be in a non-runnable state.
+        if not is_runnable():
+            return
+        args = [context] + list(args)
+        with self._run_lock:
+            # We check after we now own the run lock since a previous thread
+            # could have exited and released that lock and set the state to
+            # not runnable.
+            if not is_runnable():
+                return
+            may_proceed = self._latch.await(self._timeout)
+            # We now acquire the cancel lock so that we can be assured that
+            # we have not been cancelled by another entity.
+            with self._cancel_lock:
+                try:
+                    # If we have been cancelled after awaiting and timing out
+                    # ensure that we alter the state to show timed out (but
+                    # not if we have been cancelled, since our state should
+                    # be cancelled instead). This is done after acquiring the
+                    # cancel lock so that we will not try to overwrite another
+                    # entity trying to set the runner to the cancel state.
+                    if not may_proceed and self.state != states.CANCELLED:
+                        self._change_state(context, states.TIMED_OUT)
+                    # We at this point should only have been able to time out
+                    # or be cancelled, no other state transitions should have
+                    # been possible.
+                    if self.state not in (states.CANCELLED, states.TIMED_OUT):
+                        run(*args, **kwargs)
+                finally:
+                    signal()
diff --git a/taskflow/states.py b/taskflow/states.py
index 48d320a70..b3ff92931 100644
--- a/taskflow/states.py
+++ b/taskflow/states.py
@@ -33,8 +33,12 @@ REVERTING = 'REVERTING'
 RUNNING = RUNNING
 STARTED = 'STARTED'
 SUCCESS = SUCCESS
+CANCELLED = 'CANCELLED'
+INCOMPLETE = 'INCOMPLETE'
 
 # Task states.
 FAILURE = FAILURE
 STARTED = STARTED
 SUCCESS = SUCCESS
+TIMED_OUT = 'TIMED_OUT'
+CANCELLED = CANCELLED
diff --git a/taskflow/tests/unit/test_threaded_flow.py b/taskflow/tests/unit/test_threaded_flow.py
new file mode 100644
index 000000000..a93329c51
--- /dev/null
+++ b/taskflow/tests/unit/test_threaded_flow.py
@@ -0,0 +1,388 @@
+# -*- coding: utf-8 -*-
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import threading
+import time
+import unittest2
+
+from taskflow import decorators
+from taskflow import exceptions as excp
+from taskflow import states
+
+from taskflow.patterns import threaded_flow as tf
+from taskflow.tests import utils
+
+
+def _find_idx(what, search_where):
+    for i, j in enumerate(search_where):
+        if i == what:
+            return j
+    return -1
+
+
+class ThreadedFlowTest(unittest2.TestCase):
+    def _make_tracking_flow(self, name):
+        notify_lock = threading.RLock()
+        flo = tf.Flow(name)
+        notifications = []
+
+        def save_notify(state, details):
+            runner = details.get('runner')
+            if not runner:
+                return
+            with notify_lock:
+                notifications.append((runner.uuid, state, dict(details)))
+
+        flo.task_notifier.register('*', save_notify)
+        return (flo, notifications)
+
+    def _make_watched_flow(self, name):
+        history_lock = threading.RLock()
+        flo = tf.Flow(name)
+        history = {}
+
+        def save_state(state, details):
+            runner = details.get('runner')
+            if not runner:
+                return
+            with history_lock:
+                old_state = details.get('old_state')
+                old_states = history.get(runner.uuid, [])
+                if not old_states:
+                    old_states.append(old_state)
+                old_states.append(state)
+                history[runner.uuid] = old_states
+
+        flo.task_notifier.register('*', save_state)
+        return (flo, history)
+
+    def test_somewhat_complicated(self):
+        """Tests a somewhat complicated dependency graph.
+
+            X--Y--C--D
+                  |  |
+            A--B--    --G--
+                  |  |     |--Z(end)
+            E--F--    --H--
+        """
+        (flo, notifications) = self._make_tracking_flow("sanity-test")
+
+        # X--Y
+        x = flo.add(utils.ProvidesRequiresTask("X",
+                                               provides=['x'],
+                                               requires=[]))
+        y = flo.add(utils.ProvidesRequiresTask("Y",
+                                               provides=['y'],
+                                               requires=['x']))
+
+        # A--B
+        a = flo.add(utils.ProvidesRequiresTask("A",
+                                               provides=['a'],
+                                               requires=[]))
+        b = flo.add(utils.ProvidesRequiresTask("B",
+                                               provides=['b'],
+                                               requires=['a']))
+
+        # E--F
+        e = flo.add(utils.ProvidesRequiresTask("E",
+                                               provides=['e'],
+                                               requires=[]))
+        f = flo.add(utils.ProvidesRequiresTask("F",
+                                               provides=['f'],
+                                               requires=['e']))
+
+        # C--D
+        c = flo.add(utils.ProvidesRequiresTask("C",
+                                               provides=['c'],
+                                               requires=['f', 'b', 'y']))
+        d = flo.add(utils.ProvidesRequiresTask("D",
+                                               provides=['d'],
+                                               requires=['c']))
+
+        # G
+        g = flo.add(utils.ProvidesRequiresTask("G",
+                                               provides=['g'],
+                                               requires=['d']))
+
+        # H
+        h = flo.add(utils.ProvidesRequiresTask("H",
+                                               provides=['h'],
+                                               requires=['d']))
+
+        # Z
+        z = flo.add(utils.ProvidesRequiresTask("Z",
+                                               provides=['z'],
+                                               requires=['g', 'h']))
+
+        all_uuids = [z, h, g, d, c, f, e, b, a, y, x]
+        self.assertEquals(states.PENDING, flo.state)
+        flo.run({})
+        self.assertEquals(states.SUCCESS, flo.state)
+
+        # Analyze the notifications to determine that the correct ordering
+        # occurred
+
+        # Discard states we aren't really interested in.
+        c_notifications = []
+        uuids_ran = set()
+        for (uuid, state, details) in notifications:
+            if state not in [states.RUNNING, states.SUCCESS, states.FAILURE]:
+                continue
+            uuids_ran.add(uuid)
+            c_notifications.append((uuid, state, details))
+        notifications = c_notifications
+        self.assertEquals(len(all_uuids), len(uuids_ran))
+
+        # Select out the run order
+        just_ran_uuids = []
+        for (uuid, state, details) in notifications:
+            if state not in [states.RUNNING]:
+                continue
+            just_ran_uuids.append(uuid)
+
+        def ran_before(ran_uuid, before_what):
+            before_idx = just_ran_uuids.index(ran_uuid)
+            other_idxs = [just_ran_uuids.index(u) for u in before_what]
+            was_before = True
+            for idx in other_idxs:
+                if idx < before_idx:
+                    was_before = False
+            return was_before
+
+        def ran_after(ran_uuid, after_what):
+            after_idx = just_ran_uuids.index(ran_uuid)
+            other_idxs = [just_ran_uuids.index(u) for u in after_what]
+            was_after = True
+            for idx in other_idxs:
+                if idx > after_idx:
+                    was_after = False
+            return was_after
+
+        # X, A, E should always run before the others
+        self.assertTrue(ran_before(x, [c, d, g, h, z]))
+        self.assertTrue(ran_before(a, [c, d, g, h, z]))
+        self.assertTrue(ran_before(e, [c, d, g, h, z]))
+
+        # Y, B, F should always run before C
+        self.assertTrue(ran_before(y, [c]))
+        self.assertTrue(ran_before(b, [c]))
+        self.assertTrue(ran_before(f, [c]))
+
+        # C runs before D
+        self.assertTrue(ran_before(c, [d]))
+
+        # G and H are before Z
+        self.assertTrue(ran_before(g, [z]))
+        self.assertTrue(ran_before(h, [z]))
+
+        # C, D runs after X, Y, B, E, F
+        self.assertTrue(ran_after(c, [x, y, b, e, f]))
+        self.assertTrue(ran_after(d, [x, y, b, c, e, f]))
+
+        # Z is last
+        all_uuids_no_z = list(all_uuids)
+        all_uuids_no_z.remove(z)
+        self.assertTrue(ran_after(z, all_uuids_no_z))
+
+    def test_empty_cancel(self):
+        (flo, history) = self._make_watched_flow("sanity-test")
+        self.assertEquals(states.PENDING, flo.state)
+        flo.cancel()
+        self.assertEquals(states.CANCELLED, flo.state)
+
+    def test_self_loop_flo(self):
+        (flo, history) = self._make_watched_flow("sanity-test")
+        flo.add(utils.ProvidesRequiresTask("do-that",
+                                           provides=['c'],
+                                           requires=['c']))
+        self.assertRaises(excp.InvalidStateException, flo.run, {})
+
+    def test_circular_flo(self):
+        (flo, history) = self._make_watched_flow("sanity-test")
+        flo.add(utils.ProvidesRequiresTask("do-that",
+                                           provides=['c'],
+                                           requires=['a']))
+        flo.add(utils.ProvidesRequiresTask("do-this",
+                                           provides=['a'],
+                                           requires=['c']))
+        self.assertRaises(excp.InvalidStateException, flo.run, {})
+
+    def test_no_input_flo(self):
+        (flo, history) = self._make_watched_flow("sanity-test")
+        flo.add(utils.ProvidesRequiresTask("do-that",
+                                           provides=['c'],
+                                           requires=['a']))
+        flo.add(utils.ProvidesRequiresTask("do-this",
+                                           provides=['b'],
+                                           requires=['c']))
+        self.assertRaises(excp.InvalidStateException, flo.run, {})
+
+    def test_simple_resume(self):
+        (flo, history) = self._make_watched_flow("sanity-test")
+        f_uuid = flo.add(utils.ProvidesRequiresTask("do-this",
+                                                    provides=['a'],
+                                                    requires=[]))
+        flo.add(utils.ProvidesRequiresTask("do-that",
+                                           provides=['c'],
+                                           requires=['a']))
+
+        def resume_it(flow, ordering):
+            ran_already = []
+            not_ran = []
+            for r in ordering:
+                if r.uuid == f_uuid:
+                    ran_already.append((r, {
+                        'result': 'b',
+                        'states': [states.SUCCESS],
+                    }))
+                else:
+                    not_ran.append(r)
+            return (ran_already, not_ran)
+
+        flo.resumer = resume_it
+        flo.run({})
+        self.assertEquals('b', flo.results[f_uuid])
+        self.assertEquals(states.SUCCESS, flo.state)
+
+    def test_active_cancel(self):
+        (flo, history) = self._make_watched_flow("sanity-test")
+        flo.add(utils.ProvidesRequiresTask("do-this",
+                                           provides=['a'],
+                                           requires=[]))
+        flo.add(utils.ProvidesRequiresTask("do-that",
+                                           provides=['c'],
+                                           requires=['a']))
+
+        @decorators.task(provides=['d'], requires=['c'])
+        def cancel_it(context, c):
+            am_cancelled = flo.cancel()
+            return am_cancelled
+
+        uuid = flo.add(cancel_it)
+        flo.add(utils.ProvidesRequiresTask("do-the-other",
+                                           provides=['e'],
+                                           requires=['d']))
+
+        flo.run({})
+        self.assertIn(uuid, flo.results)
+        self.assertEquals(states.INCOMPLETE, flo.state)
+        self.assertEquals(1, flo.results[uuid])
+
+    def test_sanity_run(self):
+        (flo, history) = self._make_watched_flow("sanity-test")
+        flo.add(utils.ProvidesRequiresTask("do-this",
+                                           provides=['a'],
+                                           requires=[]))
+        flo.add(utils.ProvidesRequiresTask("do-that",
+                                           provides=['c'],
+                                           requires=['a']))
+        flo.add(utils.ProvidesRequiresTask("do-other",
+                                           provides=['d'],
+                                           requires=[]))
+        flo.add(utils.ProvidesRequiresTask("do-thing",
+                                           provides=['e'],
+                                           requires=['d']))
+        self.assertEquals(states.PENDING, flo.state)
+        context = {}
+        flo.run(context)
+        self.assertEquals(states.SUCCESS, flo.state)
+        self.assertTrue(len(context) > 0)
+        # Even when running in parallel this will be the required order since
+        # 'do-that' depends on 'do-this' finishing first.
+        expected_order = ['do-this', 'do-that']
+        this_that = [t for t in context[utils.ORDER_KEY]
+                     if t in expected_order]
+        self.assertEquals(expected_order, this_that)
+        expected_order = ['do-other', 'do-thing']
+        this_that = [t for t in context[utils.ORDER_KEY]
+                     if t in expected_order]
+        self.assertEquals(expected_order, this_that)
+
+    def test_single_failure(self):
+
+        def reverter(context, result, cause):
+            context['reverted'] = True
+
+        @decorators.task(revert_with=reverter)
+        def fail_quick(context):
+            raise IOError("Broken")
+
+        (flo, history) = self._make_watched_flow('test-single-fail')
+        f_uuid = flo.add(fail_quick)
+        context = {}
+        self.assertRaises(IOError, flo.run, context)
+        self.assertEquals(states.FAILURE, flo.state)
+        self.assertEquals(states.FAILURE, history[f_uuid][-1])
+        self.assertTrue(context.get('reverted'))
+
+    def test_failure_cancel_successors(self):
+        (flo, history) = self._make_watched_flow("failure-cancel")
+
+        @decorators.task(provides=['b', 'c'])
+        def fail_quick(context):
+            raise IOError("Broken")
+
+        @decorators.task
+        def after_fail(context, b):
+            pass
+
+        @decorators.task
+        def after_fail2(context, c):
+            pass
+
+        fq, af, af2 = flo.add_many([fail_quick, after_fail, after_fail2])
+        self.assertEquals(states.PENDING, flo.state)
+
+        context = {}
+        self.assertRaises(IOError, flo.run, context)
+        self.assertEquals(states.FAILURE, flo.state)
+        self.assertEquals(states.FAILURE, history[fq][-1])
+        self.assertEquals(states.CANCELLED, history[af][-1])
+        self.assertEquals(states.CANCELLED, history[af2][-1])
+
+    def test_live_timeout(self):
+
+        @decorators.task(provides=['a'])
+        def task_long(context):
+            time.sleep(1)
+            return {
+                'a': 2,
+            }
+
+        @decorators.task(provides=['b'])
+        def wait_short(context, a):
+            pass
+
+        @decorators.task
+        def wait_ok_long(context, a):
+            pass
+
+        @decorators.task
+        def wait_after_short(context, b):
+            pass
+
+        (flo, history) = self._make_watched_flow('test-live')
+        flo.add(task_long)
+        ws_uuid = flo.add(wait_short, timeout=0.1)
+        flo.add(wait_ok_long)
+        was_uuid = flo.add(wait_after_short)
+
+        flo.run({})
+        self.assertEquals(states.INCOMPLETE, flo.state)
+        self.assertEquals(states.TIMED_OUT, history[ws_uuid][-1])
+        self.assertEquals(states.CANCELLED, history[was_uuid][-1])
diff --git a/taskflow/utils.py b/taskflow/utils.py
index 91cd34f1c..f4ce58afe 100644
--- a/taskflow/utils.py
+++ b/taskflow/utils.py
@@ -24,6 +24,7 @@ import logging
 import re
 import sys
 import threading
+import threading2
 import time
 
 from taskflow.openstack.common import uuidutils
@@ -32,6 +33,25 @@ TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory'
 LOG = logging.getLogger(__name__)
 
 
+def await(check_functor, timeout=None):
+    if timeout is not None:
+        end_time = time.time() + max(0, timeout)
+    else:
+        end_time = None
+    # Use the same/similar scheme that the python condition class uses.
+    delay = 0.0005
+    while not check_functor():
+        time.sleep(delay)
+        if end_time is not None:
+            remaining = end_time - time.time()
+            if remaining <= 0:
+                return False
+            delay = min(delay * 2, remaining, 0.05)
+        else:
+            delay = min(delay * 2, 0.05)
+    return True
+
+
 def get_task_version(task):
     """Gets a tasks *string* version, whether it is a task object/function."""
     task_version = getattr(task, 'version')
@@ -79,23 +99,80 @@ def is_version_compatible(version_1, version_2):
     return True
 
 
-def await(check_functor, timeout=None):
-    if timeout is not None:
-        end_time = time.time() + max(0, timeout)
-    else:
+class MultiLock(object):
+    """A class which can attempt to obtain many locks at once and release
+    said locks when exiting.
+
+    Useful as a context manager around many locks (instead of having to nest
+    said individual context managers).
+    """
+
+    def __init__(self, locks):
+        assert len(locks) > 0, "Zero locks requested"
+        self._locks = locks
+        self._locked = [False] * len(locks)
+
+    def __enter__(self):
+
+        def is_locked(lock):
+            # NOTE(harlowja): the threading2 lock doesn't seem to have this
+            # attribute, so thats why we are checking it existing first.
+            if hasattr(lock, 'locked'):
+                return lock.locked()
+            return False
+
+        for i in xrange(0, len(self._locked)):
+            if self._locked[i] or is_locked(self._locks[i]):
+                raise threading.ThreadError("Lock %s not previously released"
+                                            % (i + 1))
+            self._locked[i] = False
+        for (i, lock) in enumerate(self._locks):
+            self._locked[i] = lock.acquire()
+
+    def __exit__(self, type, value, traceback):
+        for (i, locked) in enumerate(self._locked):
+            try:
+                if locked:
+                    self._locks[i].release()
+                    self._locked[i] = False
+            except threading.ThreadError:
+                LOG.exception("Unable to release lock %s", i + 1)
+
+
+class CountDownLatch(object):
+    """Similar in concept to the java count down latch."""
+
+    def __init__(self, count=0):
+        self.count = count
+        self.lock = threading.Condition()
+
+    def countDown(self):
+        with self.lock:
+            self.count -= 1
+            if self.count <= 0:
+                self.lock.notifyAll()
+
+    def await(self, timeout=None):
         end_time = None
-    # Use the same/similar scheme that the python condition class uses.
-    delay = 0.0005
-    while not check_functor():
-        time.sleep(delay)
-        if end_time is not None:
-            remaining = end_time - time.time()
-            if remaining <= 0:
-                return False
-            delay = min(delay * 2, remaining, 0.05)
-        else:
-            delay = min(delay * 2, 0.05)
-    return True
+        if timeout is not None:
+            timeout = max(0, timeout)
+            end_time = time.time() + timeout
+        time_up = False
+        with self.lock:
+            while True:
+                # Stop waiting on these 2 conditions.
+                if time_up or self.count <= 0:
+                    break
+                # Was this a spurious wakeup or did we really end??
+                self.lock.wait(timeout=timeout)
+                if end_time is not None:
+                    if time.time() >= end_time:
+                        time_up = True
+                    else:
+                        # Reduce the timeout so that we don't wait extra time
+                        # over what we initially were requested to.
+                        timeout = end_time - time.time()
+            return self.count <= 0
 
 
 class LastFedIter(object):
@@ -113,16 +190,45 @@ class LastFedIter(object):
             yield i
 
 
+class ThreadGroupExecutor(object):
+    """A simple thread executor that spins up new threads (or greenthreads) for
+    each task to be completed (no pool limit is enforced).
+
+    TODO(harlowja): Likely if we use the more advanced executors that come with
+    the concurrent.futures library we can just get rid of this.
+    """
+
+    def __init__(self, daemonize=True):
+        self._threads = []
+        self._group = threading2.ThreadGroup()
+        self._daemonize = daemonize
+
+    def submit(self, fn, *args, **kwargs):
+        t = threading2.Thread(target=fn, group=self._group,
+                              args=args, kwargs=kwargs)
+        t.daemon = self._daemonize
+        self._threads.append(t)
+        t.start()
+
+    def await_termination(self, timeout=None):
+        if not self._threads:
+            return
+        return self._group.join(timeout)
+
+
 class FlowFailure(object):
     """When a task failure occurs the following object will be given to revert
        and can be used to interrogate what caused the failure.
     """
 
-    def __init__(self, runner, flow, exception):
+    def __init__(self, runner, flow, exc, exc_info=None):
         self.runner = runner
         self.flow = flow
-        self.exc = exception
-        self.exc_info = sys.exc_info()
+        self.exc = exc
+        if not exc_info:
+            self.exc_info = sys.exc_info()
+        else:
+            self.exc_info = exc_info
 
 
 class RollbackTask(object):
@@ -161,7 +267,6 @@ class Runner(object):
         else:
             self.task = task
         self.providers = {}
-        self.runs_before = []
         self.result = None
         if not uuid:
             self._id = uuidutils.generate_uuid()
@@ -184,6 +289,10 @@ class Runner(object):
     def optional(self):
         return self.task.optional
 
+    @property
+    def runs_before(self):
+        return []
+
     @property
     def version(self):
         return get_task_version(self.task)
@@ -205,24 +314,52 @@ class Runner(object):
         # Find all of our inputs first.
         kwargs = dict(kwargs)
         for (k, who_made) in self.providers.iteritems():
-            if who_made.result and k in who_made.result:
+            if k in kwargs:
+                continue
+            try:
                 kwargs[k] = who_made.result[k]
-            else:
-                kwargs[k] = None
+            except (TypeError, KeyError):
+                pass
         optional_keys = self.optional
-        optional_missing_keys = optional_keys - set(kwargs.keys())
-        if optional_missing_keys:
-            for k in optional_missing_keys:
-                for r in self.runs_before:
-                    r_provides = r.provides
-                    if k in r_provides and r.result and k in r.result:
-                        kwargs[k] = r.result[k]
-                        break
+        optional_keys = optional_keys - set(kwargs.keys())
+        for k in optional_keys:
+            for who_ran in self.runs_before:
+                matched = False
+                if k in who_ran.provides:
+                    try:
+                        kwargs[k] = who_ran.result[k]
+                        matched = True
+                    except (TypeError, KeyError):
+                        pass
+                if matched:
+                    break
+        # Ensure all required keys are either existent or set to none.
+        for k in self.requires:
+            if k not in kwargs:
+                kwargs[k] = None
         # And now finally run.
         self.result = self.task(*args, **kwargs)
         return self.result
 
 
+class AOTRunner(Runner):
+    """A runner that knows who runs before this runner ahead of time from a
+    known list of previous runners.
+    """
+
+    def __init__(self, task):
+        super(AOTRunner, self).__init__(task)
+        self._runs_before = []
+
+    @property
+    def runs_before(self):
+        return self._runs_before
+
+    @runs_before.setter
+    def runs_before(self, runs_before):
+        self._runs_before = list(runs_before)
+
+
 class TransitionNotifier(object):
     """A utility helper class that can be used to subscribe to
     notifications of events occuring as well as allow a entity to post said
@@ -300,16 +437,12 @@ class RollbackAccumulator(object):
     def __len__(self):
         return len(self._rollbacks)
 
-    def __iter__(self):
-        # Rollbacks happen in the reverse order that they were added.
-        return reversed(self._rollbacks)
-
     def __enter__(self):
         return self
 
     def rollback(self, cause):
         LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
-        for (i, f) in enumerate(self):
+        for (i, f) in enumerate(reversed(self._rollbacks)):
             LOG.debug("Calling rollback %s: %s", i + 1, f)
             try:
                 f(cause)
@@ -399,45 +532,6 @@ class ReaderWriterLock(object):
             self.readers_ok.release()
 
 
-class MultiLock(object):
-    """A class which can attempt to obtain many locks at once and release
-    said locks when exiting.
-
-    Useful as a context manager around many locks (instead of having to nest
-    said individual context managers).
-    """
-
-    def __init__(self, locks):
-        self._locks = locks
-        self._locked = list([False] * len(locks))
-
-    def __enter__(self):
-
-        def is_locked(lock):
-            # NOTE(harlowja): the threading2 lock doesn't seem to have this
-            # attribute, so thats why we are checking it existing first.
-            if hasattr(lock, 'locked'):
-                return lock.locked()
-            return False
-
-        for i in xrange(0, len(self._locked)):
-            if self._locked[i] or is_locked(self._locks[i]):
-                raise threading.ThreadError("Lock %s not previously released"
-                                            % (i + 1))
-            self._locked[i] = False
-        for (i, lock) in enumerate(self._locks):
-            self._locked[i] = lock.acquire()
-
-    def __exit__(self, type, value, traceback):
-        for (i, locked) in enumerate(self._locked):
-            try:
-                if locked:
-                    self._locks[i].release()
-                    self._locked[i] = False
-            except threading.ThreadError:
-                LOG.exception("Unable to release lock %s", i + 1)
-
-
 class LazyPluggable(object):
     """A pluggable backend loaded lazily based on some value."""
 
diff --git a/tools/pip-requires b/tools/pip-requires
index 6587da879..3f7082a9c 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -3,8 +3,6 @@ anyjson>=0.2.4
 eventlet>=0.9.17
 iso8601
 netaddr>=0.7.6
-# Very nice graph library
-networkx>=1.5
 # Need currently the newest version of oslo.config for the DeprecatedOpt
 # used inside oslo database layer.
 -f http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a3.tar.gz#egg=oslo.config-1.2.0a3
@@ -13,4 +11,7 @@ six
 # Only needed if database backend used.
 sqlalchemy>=0.7,<=0.7.99
 alembic>=0.4.1
+# Very nice graph library
+networkx>=1.8.1
+threading2
 Babel>=0.9.6