Merge "Make schedule a proper method of GraphAction"
This commit is contained in:
@@ -41,28 +41,31 @@ class FutureGraphAction(object):
|
|||||||
def is_running(self):
|
def is_running(self):
|
||||||
return self._storage.get_flow_state() == st.RUNNING
|
return self._storage.get_flow_state() == st.RUNNING
|
||||||
|
|
||||||
|
def _schedule(self, nodes):
|
||||||
|
"""Schedule nodes for execution.
|
||||||
|
|
||||||
|
Returns list of futures.
|
||||||
|
"""
|
||||||
|
futures = []
|
||||||
|
for node in nodes:
|
||||||
|
if isinstance(node, task.BaseTask):
|
||||||
|
future = self._schedule_task(node)
|
||||||
|
elif isinstance(node, r.Retry):
|
||||||
|
future = self._schedule_retry(node)
|
||||||
|
else:
|
||||||
|
raise TypeError("Unknown how to schedule node %s" % node)
|
||||||
|
if future is not None:
|
||||||
|
futures.append(future)
|
||||||
|
else:
|
||||||
|
next_nodes = self._analyzer.get_next_nodes(node)
|
||||||
|
futures.extend(self._schedule(next_nodes))
|
||||||
|
return futures
|
||||||
|
|
||||||
def execute(self):
|
def execute(self):
|
||||||
|
|
||||||
def schedule(nodes, not_done):
|
|
||||||
for node in nodes:
|
|
||||||
# Returns schedule function for current atom and
|
|
||||||
# executes scheduling
|
|
||||||
if isinstance(node, task.BaseTask):
|
|
||||||
future = self._schedule_task(node)
|
|
||||||
elif isinstance(node, r.Retry):
|
|
||||||
future = self._schedule_retry(node)
|
|
||||||
else:
|
|
||||||
raise TypeError("Unknown how to schedule node %s" % node)
|
|
||||||
if future is not None:
|
|
||||||
not_done.append(future)
|
|
||||||
else:
|
|
||||||
schedule(self._analyzer.get_next_nodes(node), not_done)
|
|
||||||
|
|
||||||
not_done = []
|
|
||||||
# Prepare flow to be resumed
|
# Prepare flow to be resumed
|
||||||
next_nodes = self._prepare_flow_for_resume()
|
next_nodes = self._prepare_flow_for_resume()
|
||||||
next_nodes.update(self._analyzer.get_next_nodes())
|
next_nodes.update(self._analyzer.get_next_nodes())
|
||||||
schedule(next_nodes, not_done)
|
not_done = self._schedule(next_nodes)
|
||||||
|
|
||||||
failures = []
|
failures = []
|
||||||
while not_done:
|
while not_done:
|
||||||
@@ -85,7 +88,7 @@ class FutureGraphAction(object):
|
|||||||
next_nodes.update(self._analyzer.get_next_nodes(node))
|
next_nodes.update(self._analyzer.get_next_nodes(node))
|
||||||
|
|
||||||
if next_nodes and not failures and self.is_running():
|
if next_nodes and not failures and self.is_running():
|
||||||
schedule(next_nodes, not_done)
|
not_done.extend(self._schedule(next_nodes))
|
||||||
|
|
||||||
if failures:
|
if failures:
|
||||||
misc.Failure.reraise_if_any(failures)
|
misc.Failure.reraise_if_any(failures)
|
||||||
|
|||||||
Reference in New Issue
Block a user