Action engine: save task results

Task results are saved on success and passed to revert method
on revert. If task failed (execute method raised an exception),
special object of type taskflow.utils.misc.Failure is created
and saved as task result to indicate that.

To store results Storage class is introduced. As for now it is
simple tiny wrapper of a dictionary, but future plans include
rewriting it using persistence API and adding more sophisticated
functionality.

Change-Id: Ic806421a3780653602d8879c33fdf655950a6804
This commit is contained in:
Ivan A. Melnikov
2013-09-03 16:11:18 +04:00
committed by Joshua Harlow
parent 57c4b1bdc2
commit c1b25762dc
6 changed files with 145 additions and 12 deletions

View File

@@ -23,6 +23,7 @@ from taskflow.engines.action_engine import task_action
from taskflow import blocks
from taskflow import states
from taskflow import storage
class ActionEngine(object):
@@ -34,6 +35,7 @@ class ActionEngine(object):
def __init__(self, flow, action_map):
self._action_map = action_map
self._root = self._to_action(flow)
self.storage = storage.Storage()
def _to_action(self, pattern):
try:

View File

@@ -18,8 +18,7 @@
from taskflow.engines.action_engine import base_action as base
from taskflow import states
import sys
from taskflow.utils import misc
class TaskAction(base.Action):
@@ -28,6 +27,7 @@ class TaskAction(base.Action):
self._task = block.task
if isinstance(self._task, type):
self._task = self._task()
self._id = block.uuid
self.state = states.PENDING
def execute(self, engine):
@@ -35,10 +35,12 @@ class TaskAction(base.Action):
self.state = states.RUNNING
try:
# TODO(imelnikov): pass only necessary args to task
self._task.execute()
result = self._task.execute()
except Exception:
# TODO(imelnikov): save exception information
print sys.exc_info()
result = misc.Failure()
engine.storage.save(self._id, result)
if isinstance(result, misc.Failure):
self.state = states.FAILURE
else:
self.state = states.SUCCESS
@@ -51,9 +53,10 @@ class TaskAction(base.Action):
# task a chance for cleanup
return
try:
self._task.revert()
self._task.revert(result=engine.storage.get(self._id))
except Exception:
self.state = states.FAILURE
raise
else:
engine.storage.reset(self._id)
self.state = states.PENDING

48
taskflow/storage.py Normal file
View File

@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions
class Storage(object):
"""Manages task results"""
# TODO(imelnikov): this should be implemented on top of logbook
def __init__(self):
self._task_results = {}
def save(self, uuid, data):
"""Put result for task with id 'uuid' to storage"""
self._task_results[uuid] = data
def get(self, uuid):
"""Get result for task with id 'uuid' to storage"""
try:
return self._task_results[uuid]
except KeyError:
raise exceptions.NotFound("Result for task %r is not known"
% uuid)
def reset(self, uuid):
"""Remove result for task with id 'uuid' from storage"""
try:
del self._task_results[uuid]
except KeyError:
pass

View File

@@ -34,7 +34,8 @@ class TestTask(task.Task):
return 5
def revert(self, **kwargs):
self.values.append(self.name + ' reverted')
self.values.append(self.name + ' reverted(%s)'
% kwargs.get('result'))
class FailingTask(TestTask):
@@ -71,8 +72,10 @@ class EngineTaskTest(EngineTestBase):
def test_run_task_as_flow(self):
flow = blocks.Task(TestTask(self.values, name='task1'))
self._make_engine(flow).run()
engine = self._make_engine(flow)
engine.run()
self.assertEquals(self.values, ['task1'])
self.assertEquals(engine.storage.get(flow.uuid), 5)
def test_invalid_block_raises(self):
value = 'i am string, not block, sorry'
@@ -124,7 +127,8 @@ class EngineLinearFlowTest(EngineTestBase):
blocks.Task(NeverRunningTask)
)
self._make_engine(flow).run()
self.assertEquals(self.values, ['fail reverted'])
self.assertEquals(self.values,
['fail reverted(Failure: RuntimeError: Woot!)'])
def test_correctly_reverts_children(self):
flow = blocks.LinearFlow().add(
@@ -136,9 +140,10 @@ class EngineLinearFlowTest(EngineTestBase):
)
engine = self._make_engine(flow)
engine.run()
self.assertEquals(self.values, ['task1', 'task2',
'fail reverted',
'task2 reverted', 'task1 reverted'])
self.assertEquals(self.values,
['task1', 'task2',
'fail reverted(Failure: RuntimeError: Woot!)',
'task2 reverted(5)', 'task1 reverted(5)'])
class SingleThreadedEngineTest(EngineTaskTest,

View File

@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions
from taskflow import storage
from taskflow import test
class StorageTest(test.TestCase):
def test_save_and_get(self):
s = storage.Storage()
s.save('42', 5)
self.assertEquals(s.get('42'), 5)
def test_get_non_existing_var(self):
s = storage.Storage()
with self.assertRaises(exceptions.NotFound):
s.get('42')
def test_reset(self):
s = storage.Storage()
s.save('42', 5)
s.reset('42')
with self.assertRaises(exceptions.NotFound):
s.get('42')
def test_reset_unknown_task(self):
s = storage.Storage()
self.assertEquals(s.reset('42'), None)

View File

@@ -18,6 +18,7 @@
# under the License.
from distutils import version
import sys
def get_task_version(task):
@@ -58,3 +59,33 @@ class LastFedIter(object):
yield self.first
for i in self.rest_itr:
yield i
class Failure(object):
"""Indicates failure"""
# NOTE(imelnikov): flow_utils.FlowFailure uses runner, but
# engine code does not, so we need separate class
def __init__(self, exc_info=None):
if exc_info is not None:
self._exc_info = exc_info
else:
self._exc_info = sys.exc_info()
@property
def exc_info(self):
return self._exc_info
@property
def exc(self):
return self._exc_info[1]
def reraise(self):
raise self.exc_info[0], self.exc_info[1], self.exc_info[2]
def __str__(self):
try:
exc_name = self.exc_info[0].__name__
except AttributeError:
exc_name = str(self.exc_info)
return 'Failure: %s: %s' % (exc_name, self.exc_info[1])