Reintegrate parallel action
Fixes: bug 1221505 Fixes: bug 1225759 Change-Id: Id4c915d36d0da679b313dba8421ac621aeb7c818
This commit is contained in:
@@ -40,6 +40,7 @@ class ActionEngine(object):
|
||||
|
||||
Converts the flow to recursive structure of actions.
|
||||
"""
|
||||
_graph_action = None
|
||||
|
||||
def __init__(self, flow, storage):
|
||||
self._failures = []
|
||||
@@ -100,20 +101,23 @@ class ActionEngine(object):
|
||||
self.task_notifier.notify(state, details)
|
||||
|
||||
def _translate_flow_to_action(self):
|
||||
# Flatten the flow into just 1 graph.
|
||||
assert self._graph_action is not None, ('Graph action class must be'
|
||||
' specified')
|
||||
task_graph = flow_utils.flatten(self._flow)
|
||||
ga = graph_action.SequentialGraphAction(task_graph)
|
||||
ga = self._graph_action(task_graph)
|
||||
for n in task_graph.nodes_iter():
|
||||
ga.add(n, task_action.TaskAction(n, self))
|
||||
return ga
|
||||
|
||||
@decorators.locked
|
||||
def compile(self):
|
||||
if self._root is None:
|
||||
self._root = self._translate_flow_to_action()
|
||||
|
||||
|
||||
class SingleThreadedActionEngine(ActionEngine):
|
||||
# This one attempts to run in a serial manner.
|
||||
_graph_action = graph_action.SequentialGraphAction
|
||||
|
||||
def __init__(self, flow, flow_detail=None, book=None, backend=None):
|
||||
if flow_detail is None:
|
||||
flow_detail = p_utils.create_flow_detail(flow,
|
||||
@@ -124,6 +128,9 @@ class SingleThreadedActionEngine(ActionEngine):
|
||||
|
||||
|
||||
class MultiThreadedActionEngine(ActionEngine):
|
||||
# This one attempts to run in a parallel manner.
|
||||
_graph_action = graph_action.ParallelGraphAction
|
||||
|
||||
def __init__(self, flow, flow_detail=None, book=None, backend=None,
|
||||
executor=None):
|
||||
if flow_detail is None:
|
||||
|
||||
@@ -16,7 +16,17 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
|
||||
from taskflow.engines.action_engine import base_action as base
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GraphAction(base.Action):
|
||||
@@ -79,3 +89,99 @@ class SequentialGraphAction(GraphAction):
|
||||
action = self._action_mapping[node]
|
||||
action.revert(engine) # raises on failure
|
||||
to_revert += self._resolve_dependencies(node, deps_counter, True)
|
||||
|
||||
|
||||
class ParallelGraphAction(SequentialGraphAction):
|
||||
def execute(self, engine):
|
||||
"""This action executes the provided graph in parallel by selecting
|
||||
nodes which can run (those which have there dependencies satisified
|
||||
or those with no dependencies) and submitting them to the executor
|
||||
to be ran, and then after running this process will be repeated until
|
||||
no more nodes can be ran (or a failure has a occured and all nodes
|
||||
were stopped from further running).
|
||||
"""
|
||||
# A deque is a thread safe push/pop/popleft/append implementation
|
||||
all_futures = collections.deque()
|
||||
executor = engine.executor
|
||||
has_failed = threading.Event()
|
||||
deps_lock = threading.RLock()
|
||||
deps_counter = self._get_nodes_dependencies_count()
|
||||
|
||||
def submit_followups(node):
|
||||
# Mutating the deps_counter isn't thread safe.
|
||||
with deps_lock:
|
||||
to_execute = self._resolve_dependencies(node, deps_counter)
|
||||
submit_count = 0
|
||||
for n in to_execute:
|
||||
try:
|
||||
all_futures.append(executor.submit(run_node, n))
|
||||
submit_count += 1
|
||||
except RuntimeError:
|
||||
# Someone shutdown the executor while we are still
|
||||
# using it, get out as quickly as we can...
|
||||
has_failed.set()
|
||||
break
|
||||
return submit_count
|
||||
|
||||
def run_node(node):
|
||||
if has_failed.is_set():
|
||||
# Someone failed, don't even bother running.
|
||||
return
|
||||
action = self._action_mapping[node]
|
||||
try:
|
||||
action.execute(engine)
|
||||
except Exception:
|
||||
# Make sure others don't continue working (although they may
|
||||
# be already actively working, but u can't stop that anyway).
|
||||
has_failed.set()
|
||||
raise
|
||||
if has_failed.is_set():
|
||||
# Someone else failed, don't even bother submitting any
|
||||
# followup jobs.
|
||||
return
|
||||
# NOTE(harlowja): the future itself will not return until after it
|
||||
# submits followup tasks, this keeps the parent thread waiting for
|
||||
# more results since the all_futures deque will not be empty until
|
||||
# everyone stops submitting followups.
|
||||
submitted = submit_followups(node)
|
||||
LOG.debug("After running %s, %s followup actions were submitted",
|
||||
node, submitted)
|
||||
|
||||
# Nothing to execute in the first place
|
||||
if not deps_counter:
|
||||
return
|
||||
|
||||
# Ensure that we obtain the lock just in-case the functions submitted
|
||||
# immediately themselves start submitting there own jobs (which could
|
||||
# happen if they are very quick).
|
||||
with deps_lock:
|
||||
to_execute = self._browse_nodes_to_execute(deps_counter)
|
||||
for n in to_execute:
|
||||
try:
|
||||
all_futures.append(executor.submit(run_node, n))
|
||||
except RuntimeError:
|
||||
# Someone shutdown the executor while we are still using
|
||||
# it, get out as quickly as we can....
|
||||
break
|
||||
|
||||
# Keep on continuing to consume the futures until there are no more
|
||||
# futures to consume so that we can get there failures. Notice that
|
||||
# results are not captured, as results of tasks go into storage and
|
||||
# do not get returned here.
|
||||
failures = []
|
||||
while len(all_futures):
|
||||
# Take in FIFO order, not in LIFO order.
|
||||
f = all_futures.popleft()
|
||||
try:
|
||||
f.result()
|
||||
except futures.CancelledError:
|
||||
# TODO(harlowja): can we use the cancellation feature to
|
||||
# actually achieve cancellation in taskflow??
|
||||
pass
|
||||
except Exception:
|
||||
failures.append(misc.Failure())
|
||||
if len(failures) > 1:
|
||||
raise exc.LinkedException.link([fail.exc_info
|
||||
for fail in failures])
|
||||
elif len(failures) == 1:
|
||||
failures[0].reraise()
|
||||
|
||||
Reference in New Issue
Block a user