diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index ae7f9019..b27b69d0 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -42,29 +42,34 @@ class FutureGraphAction(object): def is_running(self): return self._storage.get_flow_state() == st.RUNNING - def _schedule(self, nodes): - """Schedule nodes for execution. + def _schedule_node(self, node): + """Schedule a single node for execution.""" + if isinstance(node, task.BaseTask): + return self._schedule_task(node) + elif isinstance(node, r.Retry): + return self._schedule_retry(node) + else: + raise TypeError("Unknown how to schedule node %s" % node) - Returns list of futures. - """ + def _schedule(self, nodes): + """Schedule a group of nodes for execution.""" futures = [] for node in nodes: - if isinstance(node, task.BaseTask): - future = self._schedule_task(node) - elif isinstance(node, r.Retry): - future = self._schedule_retry(node) - else: - raise TypeError("Unknown how to schedule node %s" % node) - futures.append(future) - return futures + try: + futures.append(self._schedule_node(node)) + except Exception: + # Immediately stop scheduling future work so that we can + # exit execution early (rather than later) if a single task + # fails to schedule correctly. + return (futures, [misc.Failure()]) + return (futures, []) def execute(self): # Prepare flow to be resumed next_nodes = self._prepare_flow_for_resume() next_nodes.update(self._analyzer.get_next_nodes()) - not_done = self._schedule(next_nodes) + not_done, failures = self._schedule(next_nodes) - failures = [] while not_done: # NOTE(imelnikov): if timeout occurs before any of futures # completes, done list will be empty and we'll just go @@ -72,24 +77,36 @@ class FutureGraphAction(object): done, not_done = self._task_action.wait_for_any( not_done, _WAITING_TIMEOUT) + # Analyze the results and schedule more nodes (unless we had + # failures). If failures occured just continue processing what + # is running (so that we don't leave it abandoned) but do not + # schedule anything new. next_nodes = set() for future in done: - node, event, result = future.result() - if isinstance(node, task.BaseTask): - self._complete_task(node, event, result) - if isinstance(result, misc.Failure): - if event == ex.EXECUTED: - self._process_atom_failure(node, result) + try: + node, event, result = future.result() + if isinstance(node, task.BaseTask): + self._complete_task(node, event, result) + if isinstance(result, misc.Failure): + if event == ex.EXECUTED: + self._process_atom_failure(node, result) + else: + failures.append(result) + except Exception: + failures.append(misc.Failure()) + else: + try: + more_nodes = self._analyzer.get_next_nodes(node) + except Exception: + failures.append(misc.Failure()) else: - failures.append(result) - next_nodes.update(self._analyzer.get_next_nodes(node)) - + next_nodes.update(more_nodes) if next_nodes and not failures and self.is_running(): - not_done.extend(self._schedule(next_nodes)) + more_not_done, failures = self._schedule(next_nodes) + not_done.extend(more_not_done) if failures: misc.Failure.reraise_if_any(failures) - if self._analyzer.get_next_nodes(): return st.SUSPENDED elif self._analyzer.is_success():