From c1b25762dc92f61670f93ada3c2bec883f308d1b Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Tue, 3 Sep 2013 16:11:18 +0400 Subject: [PATCH] Action engine: save task results Task results are saved on success and passed to revert method on revert. If task failed (execute method raised an exception), special object of type taskflow.utils.misc.Failure is created and saved as task result to indicate that. To store results Storage class is introduced. As for now it is simple tiny wrapper of a dictionary, but future plans include rewriting it using persistence API and adding more sophisticated functionality. Change-Id: Ic806421a3780653602d8879c33fdf655950a6804 --- taskflow/engines/action_engine/engine.py | 2 + taskflow/engines/action_engine/task_action.py | 15 +++--- taskflow/storage.py | 48 +++++++++++++++++++ taskflow/tests/unit/test_action_engine.py | 17 ++++--- taskflow/tests/unit/test_storage.py | 44 +++++++++++++++++ taskflow/utils/misc.py | 31 ++++++++++++ 6 files changed, 145 insertions(+), 12 deletions(-) create mode 100644 taskflow/storage.py create mode 100644 taskflow/tests/unit/test_storage.py diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index e38864277..758d3f5c0 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -23,6 +23,7 @@ from taskflow.engines.action_engine import task_action from taskflow import blocks from taskflow import states +from taskflow import storage class ActionEngine(object): @@ -34,6 +35,7 @@ class ActionEngine(object): def __init__(self, flow, action_map): self._action_map = action_map self._root = self._to_action(flow) + self.storage = storage.Storage() def _to_action(self, pattern): try: diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index efb2fd735..d5b0e52db 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -18,8 +18,7 @@ from taskflow.engines.action_engine import base_action as base from taskflow import states - -import sys +from taskflow.utils import misc class TaskAction(base.Action): @@ -28,6 +27,7 @@ class TaskAction(base.Action): self._task = block.task if isinstance(self._task, type): self._task = self._task() + self._id = block.uuid self.state = states.PENDING def execute(self, engine): @@ -35,10 +35,12 @@ class TaskAction(base.Action): self.state = states.RUNNING try: # TODO(imelnikov): pass only necessary args to task - self._task.execute() + result = self._task.execute() except Exception: - # TODO(imelnikov): save exception information - print sys.exc_info() + result = misc.Failure() + + engine.storage.save(self._id, result) + if isinstance(result, misc.Failure): self.state = states.FAILURE else: self.state = states.SUCCESS @@ -51,9 +53,10 @@ class TaskAction(base.Action): # task a chance for cleanup return try: - self._task.revert() + self._task.revert(result=engine.storage.get(self._id)) except Exception: self.state = states.FAILURE raise else: + engine.storage.reset(self._id) self.state = states.PENDING diff --git a/taskflow/storage.py b/taskflow/storage.py new file mode 100644 index 000000000..bf2920a64 --- /dev/null +++ b/taskflow/storage.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from taskflow import exceptions + + +class Storage(object): + """Manages task results""" + + # TODO(imelnikov): this should be implemented on top of logbook + + def __init__(self): + self._task_results = {} + + def save(self, uuid, data): + """Put result for task with id 'uuid' to storage""" + self._task_results[uuid] = data + + def get(self, uuid): + """Get result for task with id 'uuid' to storage""" + try: + return self._task_results[uuid] + except KeyError: + raise exceptions.NotFound("Result for task %r is not known" + % uuid) + + def reset(self, uuid): + """Remove result for task with id 'uuid' from storage""" + try: + del self._task_results[uuid] + except KeyError: + pass diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index ef104399d..311da3797 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -34,7 +34,8 @@ class TestTask(task.Task): return 5 def revert(self, **kwargs): - self.values.append(self.name + ' reverted') + self.values.append(self.name + ' reverted(%s)' + % kwargs.get('result')) class FailingTask(TestTask): @@ -71,8 +72,10 @@ class EngineTaskTest(EngineTestBase): def test_run_task_as_flow(self): flow = blocks.Task(TestTask(self.values, name='task1')) - self._make_engine(flow).run() + engine = self._make_engine(flow) + engine.run() self.assertEquals(self.values, ['task1']) + self.assertEquals(engine.storage.get(flow.uuid), 5) def test_invalid_block_raises(self): value = 'i am string, not block, sorry' @@ -124,7 +127,8 @@ class EngineLinearFlowTest(EngineTestBase): blocks.Task(NeverRunningTask) ) self._make_engine(flow).run() - self.assertEquals(self.values, ['fail reverted']) + self.assertEquals(self.values, + ['fail reverted(Failure: RuntimeError: Woot!)']) def test_correctly_reverts_children(self): flow = blocks.LinearFlow().add( @@ -136,9 +140,10 @@ class EngineLinearFlowTest(EngineTestBase): ) engine = self._make_engine(flow) engine.run() - self.assertEquals(self.values, ['task1', 'task2', - 'fail reverted', - 'task2 reverted', 'task1 reverted']) + self.assertEquals(self.values, + ['task1', 'task2', + 'fail reverted(Failure: RuntimeError: Woot!)', + 'task2 reverted(5)', 'task1 reverted(5)']) class SingleThreadedEngineTest(EngineTaskTest, diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py new file mode 100644 index 000000000..a51506b57 --- /dev/null +++ b/taskflow/tests/unit/test_storage.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import exceptions +from taskflow import storage +from taskflow import test + + +class StorageTest(test.TestCase): + def test_save_and_get(self): + s = storage.Storage() + s.save('42', 5) + self.assertEquals(s.get('42'), 5) + + def test_get_non_existing_var(self): + s = storage.Storage() + with self.assertRaises(exceptions.NotFound): + s.get('42') + + def test_reset(self): + s = storage.Storage() + s.save('42', 5) + s.reset('42') + with self.assertRaises(exceptions.NotFound): + s.get('42') + + def test_reset_unknown_task(self): + s = storage.Storage() + self.assertEquals(s.reset('42'), None) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index cd055a0ec..9ccecc78e 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -18,6 +18,7 @@ # under the License. from distutils import version +import sys def get_task_version(task): @@ -58,3 +59,33 @@ class LastFedIter(object): yield self.first for i in self.rest_itr: yield i + + +class Failure(object): + """Indicates failure""" + # NOTE(imelnikov): flow_utils.FlowFailure uses runner, but + # engine code does not, so we need separate class + + def __init__(self, exc_info=None): + if exc_info is not None: + self._exc_info = exc_info + else: + self._exc_info = sys.exc_info() + + @property + def exc_info(self): + return self._exc_info + + @property + def exc(self): + return self._exc_info[1] + + def reraise(self): + raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + + def __str__(self): + try: + exc_name = self.exc_info[0].__name__ + except AttributeError: + exc_name = str(self.exc_info) + return 'Failure: %s: %s' % (exc_name, self.exc_info[1])