Files
deb-python-taskflow/taskflow/engines/action_engine/graph_action.py
Joshua Harlow 53dcbd4d97 Implement run iterations
Instead of blocking the caller when they call run()
allow there to be a new api run_iter() that will yield
back the engine state transitions while running. This
allows for a engine user to do alternate work while an
engine is running (and come back to yield on there own
time).

Implements blueprint iterable-execution

Change-Id: Ibb48c6c5618c97c59a6ab170dab5233ed47e5554
2014-05-07 14:41:17 -07:00

234 lines
9.3 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.engines.action_engine import executor as ex
from taskflow import exceptions as excp
from taskflow import retry as r
from taskflow import states as st
from taskflow import task
from taskflow.utils import misc
_WAITING_TIMEOUT = 60 # in seconds
class FutureGraphAction(object):
"""Graph action build around futures returned by task action.
This graph action schedules all task it can for execution and than
waits on returned futures. If task executor is able to execute tasks
in parallel, this enables parallel flow run and reversion.
"""
# Informational states this action yields while running, not useful to
# have the engine record but useful to provide to end-users when doing
# execution iterations.
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, analyzer, storage, task_action, retry_action):
self._analyzer = analyzer
self._storage = storage
self._task_action = task_action
self._retry_action = retry_action
def is_running(self):
return self._storage.get_flow_state() == st.RUNNING
def _schedule_node(self, node):
"""Schedule a single node for execution."""
if isinstance(node, task.BaseTask):
return self._schedule_task(node)
elif isinstance(node, r.Retry):
return self._schedule_retry(node)
else:
raise TypeError("Unknown how to schedule node %s" % node)
def _schedule(self, nodes):
"""Schedule a group of nodes for execution."""
futures = []
for node in nodes:
try:
futures.append(self._schedule_node(node))
except Exception:
# Immediately stop scheduling future work so that we can
# exit execution early (rather than later) if a single task
# fails to schedule correctly.
return (futures, [misc.Failure()])
return (futures, [])
def execute_iter(self, timeout=None):
if timeout is None:
timeout = _WAITING_TIMEOUT
# Prepare flow to be resumed
yield st.RESUMING
next_nodes = self._prepare_flow_for_resume()
next_nodes.update(self._analyzer.get_next_nodes())
# Schedule nodes to be worked on
yield st.SCHEDULING
if self.is_running():
not_done, failures = self._schedule(next_nodes)
else:
not_done, failures = ([], [])
# Run!
#
# At this point we need to ensure we wait for all active nodes to
# finish running (even if we are asked to suspend) since we can not
# preempt those tasks (maybe in the future we will be better able to do
# this).
while not_done:
yield st.WAITING
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
# py2 and py3.
done, not_done = self._task_action.wait_for_any(not_done, timeout)
# Analyze the results and schedule more nodes (unless we had
# failures). If failures occurred just continue processing what
# is running (so that we don't leave it abandoned) but do not
# schedule anything new.
yield st.ANALYZING
next_nodes = set()
for future in done:
try:
node, event, result = future.result()
if isinstance(node, task.BaseTask):
self._complete_task(node, event, result)
if isinstance(result, misc.Failure):
if event == ex.EXECUTED:
self._process_atom_failure(node, result)
else:
failures.append(result)
except Exception:
failures.append(misc.Failure())
else:
try:
more_nodes = self._analyzer.get_next_nodes(node)
except Exception:
failures.append(misc.Failure())
else:
next_nodes.update(more_nodes)
if next_nodes and not failures and self.is_running():
yield st.SCHEDULING
# Recheck incase someone suspended it.
if self.is_running():
more_not_done, failures = self._schedule(next_nodes)
not_done.extend(more_not_done)
if failures:
misc.Failure.reraise_if_any(failures)
if self._analyzer.get_next_nodes():
yield st.SUSPENDED
elif self._analyzer.is_success():
yield st.SUCCESS
else:
yield st.REVERTED
def _schedule_task(self, task):
"""Schedules the given task for revert or execute depending
on its intention.
"""
intention = self._storage.get_atom_intention(task.name)
if intention == st.EXECUTE:
return self._task_action.schedule_execution(task)
elif intention == st.REVERT:
return self._task_action.schedule_reversion(task)
else:
raise excp.ExecutionFailure("Unknown how to schedule task with"
" intention: %s" % intention)
def _complete_task(self, task, event, result):
"""Completes the given task, process task failure."""
if event == ex.EXECUTED:
self._task_action.complete_execution(task, result)
else:
self._task_action.complete_reversion(task, result)
def _schedule_retry(self, retry):
"""Schedules the given retry for revert or execute depending
on its intention.
"""
intention = self._storage.get_atom_intention(retry.name)
if intention == st.EXECUTE:
return self._retry_action.execute(retry)
elif intention == st.REVERT:
return self._retry_action.revert(retry)
elif intention == st.RETRY:
self._retry_action.change_state(retry, st.RETRYING)
self._retry_subflow(retry)
return self._retry_action.execute(retry)
else:
raise excp.ExecutionFailure("Unknown how to schedule retry with"
" intention: %s" % intention)
def _process_atom_failure(self, atom, failure):
"""On atom failure find its retry controller, ask for the action to
perform with failed subflow and set proper intention for subflow nodes.
"""
retry = self._analyzer.find_atom_retry(atom)
if retry:
# Ask retry controller what to do in case of failure
action = self._retry_action.on_failure(retry, atom, failure)
if action == r.RETRY:
# Prepare subflow for revert
self._storage.set_atom_intention(retry.name, st.RETRY)
for node in self._analyzer.iterate_subgraph(retry):
self._storage.set_atom_intention(node.name, st.REVERT)
elif action == r.REVERT:
# Ask parent checkpoint
self._process_atom_failure(retry, failure)
elif action == r.REVERT_ALL:
# Prepare all flow for revert
self._revert_all()
else:
self._revert_all()
def _revert_all(self):
for node in self._analyzer.iterate_all_nodes():
self._storage.set_atom_intention(node.name, st.REVERT)
def _prepare_flow_for_resume(self):
for node in self._analyzer.iterate_all_nodes():
if self._analyzer.get_state(node) == st.FAILURE:
self._process_atom_failure(node, self._storage.get(node.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
self._retry_subflow(retry)
next_nodes = set()
for node in self._analyzer.iterate_all_nodes():
if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING):
next_nodes.add(node)
return next_nodes
def reset_all(self):
self._retry_subflow(None)
def _retry_subflow(self, retry):
if retry is not None:
self._storage.set_atom_intention(retry.name, st.EXECUTE)
nodes_iter = self._analyzer.iterate_subgraph(retry)
else:
nodes_iter = self._analyzer.iterate_all_nodes()
for node in nodes_iter:
if isinstance(node, task.BaseTask):
self._task_action.change_state(node, st.PENDING, progress=0.0)
else:
self._retry_action.change_state(node, st.PENDING)
self._storage.set_atom_intention(node.name, st.EXECUTE)