diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 09495d4c..ae3677ff 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -19,11 +19,13 @@ import threading from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import graph_action from taskflow.engines.action_engine import graph_analyzer +from taskflow.engines.action_engine import retry_action from taskflow.engines.action_engine import task_action from taskflow.engines import base from taskflow import exceptions as exc from taskflow.openstack.common import excutils +from taskflow import retry from taskflow import states from taskflow import storage as t_storage @@ -50,6 +52,7 @@ class ActionEngine(base.EngineBase): _graph_analyzer_cls = graph_analyzer.GraphAnalyzer _task_action_cls = task_action.TaskAction _task_executor_cls = executor.SerialTaskExecutor + _retry_action_cls = retry_action.RetryAction def __init__(self, flow, flow_detail, backend, conf): super(ActionEngine, self).__init__(flow, flow_detail, backend, conf) @@ -60,6 +63,7 @@ class ActionEngine(base.EngineBase): self._state_lock = threading.RLock() self._task_executor = None self._task_action = None + self._retry_action = None def _revert(self, current_failure=None): self._change_state(states.REVERTING) @@ -150,24 +154,27 @@ class ActionEngine(base.EngineBase): self.task_notifier.notify(states.PENDING, details) self._change_state(states.PENDING) - def _ensure_storage_for(self, task_graph): + def _ensure_storage_for(self, execution_graph): # NOTE(harlowja): signal to the tasks that exist that we are about to # resume, if they have a previous state, they will now transition to # a resuming state (and then to suspended). self._change_state(states.RESUMING) # does nothing in PENDING state - for task in task_graph.nodes_iter(): - task_version = misc.get_version_string(task) - self.storage.ensure_task(task.name, task_version, task.save_as) + for node in execution_graph.nodes_iter(): + version = misc.get_version_string(node) + if isinstance(node, retry.Retry): + self.storage.ensure_retry(node.name, version, node.save_as) + else: + self.storage.ensure_task(node.name, version, node.save_as) self._change_state(states.SUSPENDED) # does nothing in PENDING state @lock_utils.locked def compile(self): if self._compiled: return - task_graph = flow_utils.flatten(self._flow) - if task_graph.number_of_nodes() == 0: + execution_graph = flow_utils.flatten(self._flow) + if execution_graph.number_of_nodes() == 0: raise exc.EmptyFlow("Flow %s is empty." % self._flow.name) - self._analyzer = self._graph_analyzer_cls(task_graph, + self._analyzer = self._graph_analyzer_cls(execution_graph, self.storage) if self._task_executor is None: self._task_executor = self._task_executor_cls() @@ -175,15 +182,19 @@ class ActionEngine(base.EngineBase): self._task_action = self._task_action_cls(self.storage, self._task_executor, self.task_notifier) + if self._retry_action is None: + self._retry_action = self._retry_action_cls(self.storage, + self.task_notifier) self._root = self._graph_action_cls(self._analyzer, self.storage, - self._task_action) + self._task_action, + self._retry_action) # NOTE(harlowja): Perform initial state manipulation and setup. # # TODO(harlowja): This doesn't seem like it should be in a compilation # function since compilation seems like it should not modify any # external state. - self._ensure_storage_for(task_graph) + self._ensure_storage_for(execution_graph) self._compiled = True diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 8a7115bd..da5a3f25 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -15,6 +15,7 @@ # under the License. from taskflow import states as st +from taskflow import task from taskflow.utils import misc @@ -29,10 +30,11 @@ class FutureGraphAction(object): in parallel, this enables parallel flow run and reversion. """ - def __init__(self, analyzer, storage, task_action): + def __init__(self, analyzer, storage, task_action, retry_action): self._analyzer = analyzer self._storage = storage self._task_action = task_action + self._retry_action = retry_action def is_running(self): return self._storage.get_flow_state() == st.RUNNING @@ -45,6 +47,7 @@ class FutureGraphAction(object): self.is_running, self._task_action.schedule_execution, self._task_action.complete_execution, + self._retry_action.execute, self._analyzer.browse_nodes_for_execute) return st.SUSPENDED if was_suspended else st.SUCCESS @@ -53,14 +56,21 @@ class FutureGraphAction(object): self.is_reverting, self._task_action.schedule_reversion, self._task_action.complete_reversion, + self._retry_action.revert, self._analyzer.browse_nodes_for_revert) return st.SUSPENDED if was_suspended else st.REVERTED - def _run(self, running, schedule_node, complete_node, get_next_nodes): + def _run(self, running, schedule_task, complete_task, + complete_retry, get_next_nodes): def schedule(nodes, not_done): for node in nodes: - future = schedule_node(node) + if isinstance(node, task.BaseTask): + future = schedule_task(node) + else: + # Retry controller is always executed immediately in the + # main thread and it should not be scheduled. + future = complete_retry(node) if future is not None: not_done.append(future) else: @@ -82,7 +92,8 @@ class FutureGraphAction(object): # NOTE(harlowja): event will be used in the future for smart # reversion (ignoring it for now). node, _event, result = future.result() - complete_node(node, result) + if isinstance(node, task.BaseTask): + complete_task(node, result) if isinstance(result, misc.Failure): failures.append(result) else: diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py new file mode 100644 index 00000000..68f93282 --- /dev/null +++ b/taskflow/engines/action_engine/retry_action.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from taskflow import states +from taskflow.utils import misc + +LOG = logging.getLogger(__name__) + +SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) + + +class RetryAction(object): + def __init__(self, storage, notifier): + self._storage = storage + self._notifier = notifier + + def _get_retry_args(self, retry): + kwargs = self._storage.fetch_mapped_args(retry.rebind) + kwargs['history'] = self._storage.get_retry_history(retry.name) + return kwargs + + def _change_state(self, retry, state, result=None): + old_state = self._storage.get_task_state(retry.name) + if not states.check_task_transition(old_state, state): + return False + if state in SAVE_RESULT_STATES: + self._storage.save(retry.name, result, state) + elif state == states.REVERTED: + self.storage.cleanup_retry_history(retry.name, state) + else: + self._storage.set_task_state(retry.name, state) + + retry_uuid = self._storage.get_task_uuid(retry.name) + details = dict(retry_name=retry.name, + retry_uuid=retry_uuid, + result=result) + self._notifier.notify(state, details) + return True + + def execute(self, retry): + if not self._change_state(retry, states.RUNNING): + return + kwargs = self._get_retry_args(retry) + try: + result = retry.execute(**kwargs) + except Exception: + result = misc.Failure() + self._change_state(retry, states.FAILURE, result=result) + else: + self._change_state(retry, states.SUCCESS, result=result) + + def revert(self, retry): + if not self._change_state(retry, states.REVERTING): + return + kwargs = self._get_retry_args(retry) + kwargs['flow_failures'] = self._storage.get_failures() + try: + retry.revert(**kwargs) + except Exception: + self._change_state(retry, states.FAILURE) + else: + self._change_state(retry, states.REVERTED) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py new file mode 100644 index 00000000..81d260d4 --- /dev/null +++ b/taskflow/tests/unit/test_retries.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from taskflow.patterns import graph_flow as gf +from taskflow.patterns import linear_flow as lf +from taskflow.patterns import unordered_flow as uf + +import taskflow.engines + +from taskflow import test +from taskflow.tests import utils + +from taskflow.utils import eventlet_utils as eu + + +class RetryTest(utils.EngineTestBase): + + def test_run_empty_linear_flow(self): + flow = lf.Flow('flow-1', utils.OneReturnRetry(provides='x')) + engine = self._make_engine(flow) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'x': 1}) + + def test_run_empty_unordered_flow(self): + flow = uf.Flow('flow-1', utils.OneReturnRetry(provides='x')) + engine = self._make_engine(flow) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'x': 1}) + + def test_run_empty_graph_flow(self): + flow = gf.Flow('flow-1', utils.OneReturnRetry(provides='x')) + engine = self._make_engine(flow) + engine.run() + self.assertEqual(engine.storage.fetch_all(), {'x': 1}) + + +class SingleThreadedEngineTest(RetryTest, + test.TestCase): + def _make_engine(self, flow, flow_detail=None): + return taskflow.engines.load(flow, + flow_detail=flow_detail, + engine_conf='serial', + backend=self.backend) + + +class MultiThreadedEngineTest(RetryTest, + test.TestCase): + def _make_engine(self, flow, flow_detail=None, executor=None): + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) + + +@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +class ParallelEngineWithEventletTest(RetryTest, + test.TestCase): + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = eu.GreenExecutor() + engine_conf = dict(engine='parallel', + executor=executor) + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine_conf=engine_conf, + backend=self.backend) diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 0ce356aa..d287a4b1 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -20,6 +20,7 @@ import six from taskflow import exceptions from taskflow.persistence.backends import impl_memory +from taskflow import retry from taskflow import task from taskflow.utils import misc @@ -269,3 +270,12 @@ class FailureMatcher(object): def __eq__(self, other): return self._failure.matches(other) + + +class OneReturnRetry(retry.AlwaysRevert): + + def execute(self, **kwargs): + return 1 + + def revert(self, **kwargs): + pass