diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index e38864277..758d3f5c0 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -23,6 +23,7 @@ from taskflow.engines.action_engine import task_action from taskflow import blocks from taskflow import states +from taskflow import storage class ActionEngine(object): @@ -34,6 +35,7 @@ class ActionEngine(object): def __init__(self, flow, action_map): self._action_map = action_map self._root = self._to_action(flow) + self.storage = storage.Storage() def _to_action(self, pattern): try: diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index efb2fd735..d5b0e52db 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -18,8 +18,7 @@ from taskflow.engines.action_engine import base_action as base from taskflow import states - -import sys +from taskflow.utils import misc class TaskAction(base.Action): @@ -28,6 +27,7 @@ class TaskAction(base.Action): self._task = block.task if isinstance(self._task, type): self._task = self._task() + self._id = block.uuid self.state = states.PENDING def execute(self, engine): @@ -35,10 +35,12 @@ class TaskAction(base.Action): self.state = states.RUNNING try: # TODO(imelnikov): pass only necessary args to task - self._task.execute() + result = self._task.execute() except Exception: - # TODO(imelnikov): save exception information - print sys.exc_info() + result = misc.Failure() + + engine.storage.save(self._id, result) + if isinstance(result, misc.Failure): self.state = states.FAILURE else: self.state = states.SUCCESS @@ -51,9 +53,10 @@ class TaskAction(base.Action): # task a chance for cleanup return try: - self._task.revert() + self._task.revert(result=engine.storage.get(self._id)) except Exception: self.state = states.FAILURE raise else: + engine.storage.reset(self._id) self.state = states.PENDING diff --git a/taskflow/storage.py b/taskflow/storage.py new file mode 100644 index 000000000..bf2920a64 --- /dev/null +++ b/taskflow/storage.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from taskflow import exceptions + + +class Storage(object): + """Manages task results""" + + # TODO(imelnikov): this should be implemented on top of logbook + + def __init__(self): + self._task_results = {} + + def save(self, uuid, data): + """Put result for task with id 'uuid' to storage""" + self._task_results[uuid] = data + + def get(self, uuid): + """Get result for task with id 'uuid' to storage""" + try: + return self._task_results[uuid] + except KeyError: + raise exceptions.NotFound("Result for task %r is not known" + % uuid) + + def reset(self, uuid): + """Remove result for task with id 'uuid' from storage""" + try: + del self._task_results[uuid] + except KeyError: + pass diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index ef104399d..311da3797 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -34,7 +34,8 @@ class TestTask(task.Task): return 5 def revert(self, **kwargs): - self.values.append(self.name + ' reverted') + self.values.append(self.name + ' reverted(%s)' + % kwargs.get('result')) class FailingTask(TestTask): @@ -71,8 +72,10 @@ class EngineTaskTest(EngineTestBase): def test_run_task_as_flow(self): flow = blocks.Task(TestTask(self.values, name='task1')) - self._make_engine(flow).run() + engine = self._make_engine(flow) + engine.run() self.assertEquals(self.values, ['task1']) + self.assertEquals(engine.storage.get(flow.uuid), 5) def test_invalid_block_raises(self): value = 'i am string, not block, sorry' @@ -124,7 +127,8 @@ class EngineLinearFlowTest(EngineTestBase): blocks.Task(NeverRunningTask) ) self._make_engine(flow).run() - self.assertEquals(self.values, ['fail reverted']) + self.assertEquals(self.values, + ['fail reverted(Failure: RuntimeError: Woot!)']) def test_correctly_reverts_children(self): flow = blocks.LinearFlow().add( @@ -136,9 +140,10 @@ class EngineLinearFlowTest(EngineTestBase): ) engine = self._make_engine(flow) engine.run() - self.assertEquals(self.values, ['task1', 'task2', - 'fail reverted', - 'task2 reverted', 'task1 reverted']) + self.assertEquals(self.values, + ['task1', 'task2', + 'fail reverted(Failure: RuntimeError: Woot!)', + 'task2 reverted(5)', 'task1 reverted(5)']) class SingleThreadedEngineTest(EngineTaskTest, diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py new file mode 100644 index 000000000..a51506b57 --- /dev/null +++ b/taskflow/tests/unit/test_storage.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import exceptions +from taskflow import storage +from taskflow import test + + +class StorageTest(test.TestCase): + def test_save_and_get(self): + s = storage.Storage() + s.save('42', 5) + self.assertEquals(s.get('42'), 5) + + def test_get_non_existing_var(self): + s = storage.Storage() + with self.assertRaises(exceptions.NotFound): + s.get('42') + + def test_reset(self): + s = storage.Storage() + s.save('42', 5) + s.reset('42') + with self.assertRaises(exceptions.NotFound): + s.get('42') + + def test_reset_unknown_task(self): + s = storage.Storage() + self.assertEquals(s.reset('42'), None) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index cd055a0ec..9ccecc78e 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -18,6 +18,7 @@ # under the License. from distutils import version +import sys def get_task_version(task): @@ -58,3 +59,33 @@ class LastFedIter(object): yield self.first for i in self.rest_itr: yield i + + +class Failure(object): + """Indicates failure""" + # NOTE(imelnikov): flow_utils.FlowFailure uses runner, but + # engine code does not, so we need separate class + + def __init__(self, exc_info=None): + if exc_info is not None: + self._exc_info = exc_info + else: + self._exc_info = sys.exc_info() + + @property + def exc_info(self): + return self._exc_info + + @property + def exc(self): + return self._exc_info[1] + + def reraise(self): + raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + + def __str__(self): + try: + exc_name = self.exc_info[0].__name__ + except AttributeError: + exc_name = str(self.exc_info) + return 'Failure: %s: %s' % (exc_name, self.exc_info[1])