Refactor engine internals

In this change we:
- introduce single graph action, that works with futures
- split methods of TaskAction into schedule/complete parts,
  with schedule* returns future;
- task executor became entity that manages Executor, instead
  of engine + graph action.

Change-Id: Ic0704534391abdd566bf2d87adabef43a9c88cfb
This commit is contained in:
Ivan A. Melnikov
2013-12-19 10:24:41 +02:00
parent 2409a46295
commit c19758dd41
5 changed files with 92 additions and 176 deletions

View File

@@ -18,8 +18,6 @@
import threading
from concurrent import futures
from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import graph_action
from taskflow.engines.action_engine import task_action
@@ -34,7 +32,6 @@ from taskflow.utils import flow_utils
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import reflection
from taskflow.utils import threading_utils
class ActionEngine(base.EngineBase):
@@ -192,46 +189,19 @@ class ActionEngine(base.EngineBase):
class SingleThreadedActionEngine(ActionEngine):
# NOTE(harlowja): This one attempts to run in a serial manner.
_graph_action_cls = graph_action.SequentialGraphAction
_graph_action_cls = graph_action.FutureGraphAction
_storage_cls = t_storage.Storage
class MultiThreadedActionEngine(ActionEngine):
# NOTE(harlowja): This one attempts to run in a parallel manner.
_graph_action_cls = graph_action.ParallelGraphAction
_graph_action_cls = graph_action.FutureGraphAction
_storage_cls = t_storage.ThreadSafeStorage
def _task_executor_cls(self):
return executor.ParallelTaskExecutor(self._executor)
def __init__(self, flow, flow_detail, backend, conf):
self._executor = conf.get('executor', None)
super(MultiThreadedActionEngine, self).__init__(
flow, flow_detail, backend, conf)
self._executor = conf.get('executor', None)
@lock_utils.locked
def run(self):
if self._executor is None:
# NOTE(harlowja): since no executor was provided we have to create
# one, and also ensure that we shutdown the one we create to
# ensure that we don't leak threads.
thread_count = threading_utils.get_optimal_thread_count()
self._executor = futures.ThreadPoolExecutor(thread_count)
owns_executor = True
else:
owns_executor = False
try:
ActionEngine.run(self)
finally:
# Don't forget to shutdown the executor!!
if owns_executor:
try:
self._executor.shutdown(wait=True)
finally:
self._executor = None
@property
def executor(self):
"""Returns the current executor, if no executor is provided on
construction then this executor will change each time the engine
is ran.
"""
return self._executor

View File

@@ -22,6 +22,7 @@ import six
from concurrent import futures
from taskflow.utils import eventlet_utils as eu
from taskflow.utils import misc
from taskflow.utils import threading_utils
@@ -83,7 +84,7 @@ class TaskExecutorBase(object):
"""Schedules task reversion"""
@abc.abstractmethod
def wait_for_any(self, fs):
def wait_for_any(self, fs, timeout=None):
"""Wait for futures returned by this executor to complete"""
def start(self):
@@ -114,7 +115,7 @@ class SerialTaskExecutor(TaskExecutorBase):
_revert_task(task, arguments, result,
failures, progress_callback))
def wait_for_any(self, fs):
def wait_for_any(self, fs, timeout=None):
# NOTE(imelnikov): this executor returns only done futures
return fs, []
@@ -140,8 +141,8 @@ class ParallelTaskExecutor(TaskExecutorBase):
_revert_task, task,
arguments, result, failures, progress_callback)
def wait_for_any(self, fs):
return futures.wait(fs, return_when=futures.FIRST_COMPLETED)
def wait_for_any(self, fs, timeout=None):
return eu.wait_for_any(fs, timeout)
def start(self):
if self._own_executor:

View File

@@ -16,11 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
import threading
from concurrent import futures
from taskflow import states as st
from taskflow.utils import misc
@@ -67,130 +63,75 @@ class GraphAction(object):
return deps_counter
class SequentialGraphAction(GraphAction):
_WAITING_TIMEOUT = 60 # in seconds
class FutureGraphAction(GraphAction):
"""Graph action build around futures returned by task action.
This graph action schedules all task it can for execution and than
waits on returned futures. If task executor is able to execute tasks
in parallel, this enables parallel flow run and reversion.
"""
def execute(self, engine):
deps_counter = self._get_nodes_dependencies_count()
to_execute = self._browse_nodes_to_execute(deps_counter)
was_suspended = self._run(engine, lambda: engine.is_running,
engine.task_action.schedule_execution,
engine.task_action.complete_execution,
revert=False)
while to_execute and engine.is_running:
node = to_execute.pop()
engine.task_action.execute(node)
to_execute += self._resolve_dependencies(node, deps_counter)
if to_execute:
return st.SUSPENDED
return st.SUCCESS
return st.SUSPENDED if was_suspended else st.SUCCESS
def revert(self, engine):
deps_counter = self._get_nodes_dependencies_count(True)
to_revert = self._browse_nodes_to_execute(deps_counter)
was_suspended = self._run(engine, lambda: engine.is_reverting,
engine.task_action.schedule_reversion,
engine.task_action.complete_reversion,
revert=True)
return st.SUSPENDED if was_suspended else st.REVERTED
while to_revert and engine.is_reverting:
node = to_revert.pop()
engine.task_action.revert(node)
to_revert += self._resolve_dependencies(node, deps_counter, True)
def _run(self, engine, running, schedule_node, complete_node, revert):
deps_counter = self._get_nodes_dependencies_count(revert)
not_done = []
if to_revert:
return st.SUSPENDED
return st.REVERTED
class ParallelGraphAction(SequentialGraphAction):
def execute(self, engine):
"""This action executes the provided graph in parallel by selecting
nodes which can run (those which have there dependencies satisfied
or those with no dependencies) and submitting them to the executor
to be ran, and then after running this process will be repeated until
no more nodes can be ran (or a failure has a occurred and all nodes
were stopped from further running).
"""
# A deque is a thread safe push/pop/popleft/append implementation
all_futures = collections.deque()
executor = engine.executor
has_failed = threading.Event()
deps_lock = threading.RLock()
deps_counter = self._get_nodes_dependencies_count()
was_suspended = threading.Event()
def submit_followups(node):
# Mutating the deps_counter isn't thread safe.
with deps_lock:
to_execute = self._resolve_dependencies(node, deps_counter)
submit_count = 0
for n in to_execute:
try:
all_futures.append(executor.submit(run_node, n))
submit_count += 1
except RuntimeError:
# Someone shutdown the executor while we are still
# using it, get out as quickly as we can...
has_failed.set()
break
return submit_count
def run_node(node):
if has_failed.is_set():
# Someone failed, don't even bother running.
return
try:
if engine.is_running:
engine.task_action.execute(node)
def schedule(nodes):
for node in nodes:
future = schedule_node(node)
if future is not None:
not_done.append(future)
else:
was_suspended.set()
return
except Exception:
# Make sure others don't continue working (although they may
# be already actively working, but u can't stop that anyway).
has_failed.set()
raise
if has_failed.is_set():
# Someone else failed, don't even bother submitting any
# followup jobs.
return
# NOTE(harlowja): the future itself will not return until after it
# submits followup tasks, this keeps the parent thread waiting for
# more results since the all_futures deque will not be empty until
# everyone stops submitting followups.
submitted = submit_followups(node)
LOG.debug("After running %s, %s followup actions were submitted",
node, submitted)
schedule(self._resolve_dependencies(
node, deps_counter, revert))
# Nothing to execute in the first place
if not deps_counter:
return st.SUCCESS
# Ensure that we obtain the lock just in-case the functions submitted
# immediately themselves start submitting there own jobs (which could
# happen if they are very quick).
with deps_lock:
to_execute = self._browse_nodes_to_execute(deps_counter)
for n in to_execute:
try:
all_futures.append(executor.submit(run_node, n))
except RuntimeError:
# Someone shutdown the executor while we are still using
# it, get out as quickly as we can....
break
# Keep on continuing to consume the futures until there are no more
# futures to consume so that we can get there failures. Notice that
# results are not captured, as results of tasks go into storage and
# do not get returned here.
schedule(self._browse_nodes_to_execute(deps_counter))
failures = []
while len(all_futures):
# Take in FIFO order, not in LIFO order.
f = all_futures.popleft()
try:
f.result()
except futures.CancelledError:
# TODO(harlowja): can we use the cancellation feature to
# actually achieve cancellation in taskflow??
pass
except Exception:
failures.append(misc.Failure())
was_suspended = False
while not_done:
# NOTE(imelnikov): if timeout occurs before any of futures
# completes, done list will be empty and we'll just go
# for next iteration
done, not_done = engine.task_action.wait_for_any(
not_done, _WAITING_TIMEOUT)
not_done = list(not_done)
next_nodes = []
for future in done:
node, _event, result = future.result()
complete_node(node, result)
if isinstance(result, misc.Failure):
failures.append(result)
else:
next_nodes.extend(self._resolve_dependencies(
node, deps_counter, revert))
if next_nodes:
if running() and not failures:
schedule(next_nodes)
else:
# NOTE(imelnikov): engine stopped while there were
# still some tasks to do, so we either failed
# or were suspended
was_suspended = True
misc.Failure.reraise_if_any(failures)
if was_suspended.is_set():
return st.SUSPENDED
else:
return st.SUCCESS
return was_suspended

View File

@@ -62,21 +62,21 @@ class TaskAction(object):
LOG.exception("Failed setting task progress for %s to %0.3f",
task, progress)
def execute(self, task):
def schedule_execution(self, task):
if not self._change_state(task, states.RUNNING, progress=0.0):
return
kwargs = self._storage.fetch_mapped_args(task.rebind)
future = self._task_executor.execute_task(task, kwargs,
self._on_update_progress)
self._task_executor.wait_for_any(future)
_task, _event, result = future.result()
return self._task_executor.execute_task(task, kwargs,
self._on_update_progress)
def complete_execution(self, task, result):
if isinstance(result, misc.Failure):
self._change_state(task, states.FAILURE, result=result)
result.reraise()
self._change_state(task, states.SUCCESS, result=result, progress=1.0)
else:
self._change_state(task, states.SUCCESS,
result=result, progress=1.0)
def revert(self, task):
def schedule_reversion(self, task):
if not self._change_state(task, states.REVERTING, progress=0.0):
return
kwargs = self._storage.fetch_mapped_args(task.rebind)
@@ -85,9 +85,13 @@ class TaskAction(object):
future = self._task_executor.revert_task(task, kwargs,
task_result, failures,
self._on_update_progress)
self._task_executor.wait_for_any(future)
_task, _event, result = future.result()
if isinstance(result, misc.Failure):
return future
def complete_reversion(self, task, rev_result):
if isinstance(rev_result, misc.Failure):
self._change_state(task, states.FAILURE)
result.reraise()
self._change_state(task, states.REVERTED, progress=1.0)
else:
self._change_state(task, states.REVERTED, progress=1.0)
def wait_for_any(self, fs, timeout):
return self._task_executor.wait_for_any(fs, timeout)

View File

@@ -598,7 +598,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.MultiThreadedActionEngine)
self.assertIs(engine.executor, None)
self.assertIs(engine._executor, None)
def test_using_common_executor(self):
flow = utils.TaskNoRequiresNoReturns(name='task1')
@@ -606,7 +606,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
try:
e1 = self._make_engine(flow, executor=executor)
e2 = self._make_engine(flow, executor=executor)
self.assertIs(e1.executor, e2.executor)
self.assertIs(e1._executor, e2._executor)
finally:
executor.shutdown(wait=True)