diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 68bd3458..e7763068 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -19,6 +19,7 @@ import contextlib import threading from concurrent import futures +import networkx as nx from oslo_utils import excutils from oslo_utils import strutils import six @@ -28,12 +29,15 @@ from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import runtime from taskflow.engines import base from taskflow import exceptions as exc +from taskflow import logging from taskflow import states from taskflow import storage from taskflow.types import failure from taskflow.utils import lock_utils from taskflow.utils import misc +LOG = logging.getLogger(__name__) + @contextlib.contextmanager def _start_stop(executor): @@ -213,9 +217,16 @@ class ActionEngine(base.Engine): # flow/task provided or storage provided, if there are still missing # dependencies then this flow will fail at runtime (which we can avoid # by failing at validation time). + execution_graph = self._compilation.execution_graph + if LOG.isEnabledFor(logging.BLATHER): + LOG.blather("Validating scoping and argument visibility for" + " execution graph with %s nodes and %s edges with" + " density %0.3f", execution_graph.number_of_nodes(), + execution_graph.number_of_edges(), + nx.density(execution_graph)) missing = set() fetch = self.storage.fetch_unsatisfied_args - for node in self._compilation.execution_graph.nodes_iter(): + for node in execution_graph.nodes_iter(): missing.update(fetch(node.name, node.rebind, optional_args=node.optional)) if missing: