diff --git a/taskflow/blocks/task.py b/taskflow/blocks/task.py index bb8ebb57..fdf61fc5 100644 --- a/taskflow/blocks/task.py +++ b/taskflow/blocks/task.py @@ -21,7 +21,46 @@ """ from taskflow.blocks import base -from taskflow.openstack.common import uuidutils +from taskflow.utils import reflection + + +def _save_as_to_mapping(save_as): + """Convert save_as to mapping name => index + + Result should follow taskflow.storage.Storage convention + for mappings. + """ + if save_as is None: + return None + if isinstance(save_as, basestring): + return {save_as: None} + elif isinstance(save_as, tuple): + return dict((key, num) for num, key in enumerate(save_as)) + raise TypeError('Task block save_as parameter ' + 'should be str or tuple, not %r' % save_as) + + +def _build_arg_mapping(rebind_args, task): + if rebind_args is None: + rebind_args = {} + task_args = reflection.get_required_callable_args(task.execute) + nargs = len(task_args) + if isinstance(rebind_args, (list, tuple)): + if len(rebind_args) < nargs: + raise ValueError('Task %(name)s takes %(nargs)d positional ' + 'arguments (%(real)d given)' + % dict(name=task.name, nargs=nargs, + real=len(rebind_args))) + result = dict(zip(task_args, rebind_args[:nargs])) + # extra rebind_args go to kwargs + result.update((a, a) for a in rebind_args[nargs:]) + return result + elif isinstance(rebind_args, dict): + result = dict((a, a) for a in task_args) + result.update(rebind_args) + return result + else: + raise TypeError('rebind_args should be list, tuple or dict') class Task(base.Block): @@ -30,18 +69,23 @@ class Task(base.Block): The task should be executed, and produced results should be saved. """ - def __init__(self, task, uuid=None): + def __init__(self, task, save_as=None, rebind_args=None): super(Task, self).__init__() self._task = task - if uuid is None: - self._id = uuidutils.generate_uuid() - else: - self._id = str(uuid) + if isinstance(self._task, type): + self._task = self._task() + + self._result_mapping = _save_as_to_mapping(save_as) + self._args_mapping = _build_arg_mapping(rebind_args, self._task) @property def task(self): return self._task @property - def uuid(self): - return self._id + def result_mapping(self): + return self._result_mapping + + @property + def args_mapping(self): + return self._args_mapping diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 758d3f5c..1acfed73 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -17,44 +17,75 @@ # License for the specific language governing permissions and limitations # under the License. - from taskflow.engines.action_engine import seq_action from taskflow.engines.action_engine import task_action from taskflow import blocks from taskflow import states -from taskflow import storage +from taskflow import storage as t_storage +from taskflow.utils import flow_utils +from taskflow.utils import misc class ActionEngine(object): - """Generic action-based engine + """Generic action-based engine. Converts the flow to recursive structure of actions. """ - def __init__(self, flow, action_map): + def __init__(self, flow, action_map, storage): self._action_map = action_map - self._root = self._to_action(flow) - self.storage = storage.Storage() + self.notifier = flow_utils.TransitionNotifier() + self.task_notifier = flow_utils.TransitionNotifier() + self.storage = storage + self.failures = [] + self._root = self.to_action(flow) - def _to_action(self, pattern): + def to_action(self, pattern): try: factory = self._action_map[type(pattern)] except KeyError: raise ValueError('Action of unknown type: %s (type %s)' % (pattern, type(pattern))) - return factory(pattern, self._to_action) + return factory(pattern, self) + + def _revert(self, current_failure): + self._change_state(states.REVERTING) + self._root.revert(self) + self._change_state(states.REVERTED) + if self.failures: + self.failures[0].reraise() + else: + current_failure.reraise() def run(self): - status = self._root.execute(self) - if status == states.FAILURE: - self._root.revert(self) + self._change_state(states.RUNNING) + try: + self._root.execute(self) + except Exception: + self._revert(misc.Failure()) + else: + self._change_state(states.SUCCESS) + + def _change_state(self, state): + self.storage.set_flow_state(state) + details = dict(engine=self) + self.notifier.notify(state, details) + + def on_task_state_change(self, task_action, state, result=None): + if isinstance(result, misc.Failure): + self.failures.append(result) + details = dict(engine=self, + task_name=task_action.name, + task_uuid=task_action.uuid, + result=result) + self.task_notifier.notify(state, details) class SingleThreadedActionEngine(ActionEngine): - def __init__(self, flow): + def __init__(self, flow, flow_detail=None): ActionEngine.__init__(self, flow, { blocks.Task: task_action.TaskAction, blocks.LinearFlow: seq_action.SequentialAction, blocks.ParallelFlow: seq_action.SequentialAction - }) + }, t_storage.Storage(flow_detail)) diff --git a/taskflow/engines/action_engine/seq_action.py b/taskflow/engines/action_engine/seq_action.py index b68a96bd..eed27691 100644 --- a/taskflow/engines/action_engine/seq_action.py +++ b/taskflow/engines/action_engine/seq_action.py @@ -17,27 +17,17 @@ # under the License. from taskflow.engines.action_engine import base_action as base -from taskflow import states class SequentialAction(base.Action): - def __init__(self, pattern, to_action): - self._history = [] - self._actions = [to_action(pat) for pat in pattern.children] + def __init__(self, pattern, engine): + self._actions = [engine.to_action(pat) for pat in pattern.children] def execute(self, engine): - state = states.SUCCESS for action in self._actions: - #TODO(imelnikov): save history to storage - self._history.append(action) - state = action.execute(engine) - if state != states.SUCCESS: - break - return state + action.execute(engine) # raises on failure def revert(self, engine): - while self._history: - action = self._history[-1] + for action in reversed(self._actions): action.revert(engine) - self._history.pop() diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index d5b0e52d..155812e3 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -17,46 +17,75 @@ # under the License. from taskflow.engines.action_engine import base_action as base +from taskflow import exceptions +from taskflow.openstack.common import excutils +from taskflow.openstack.common import uuidutils from taskflow import states from taskflow.utils import misc class TaskAction(base.Action): - def __init__(self, block, _to_action): + def __init__(self, block, engine): self._task = block.task - if isinstance(self._task, type): - self._task = self._task() - self._id = block.uuid - self.state = states.PENDING + self._result_mapping = block.result_mapping + self._args_mapping = block.args_mapping + try: + self._id = engine.storage.get_uuid_by_name(self._task.name) + except exceptions.NotFound: + self._id = uuidutils.generate_uuid() + engine.storage.add_task(task_name=self.name, uuid=self.uuid) + engine.storage.set_result_mapping(self.uuid, self._result_mapping) + + @property + def name(self): + return self._task.name + + @property + def uuid(self): + return self._id + + def _change_state(self, engine, state): + """Check and update state of task.""" + engine.storage.set_task_state(self.uuid, state) + engine.on_task_state_change(self, state) + + def _update_result(self, engine, state, result=None): + """Update result and change state.""" + if state == states.PENDING: + engine.storage.reset(self.uuid) + else: + engine.storage.save(self.uuid, result, state) + engine.on_task_state_change(self, state, result) def execute(self, engine): - # TODO(imelnikov): notifications - self.state = states.RUNNING - try: - # TODO(imelnikov): pass only necessary args to task - result = self._task.execute() - except Exception: - result = misc.Failure() + if engine.storage.get_task_state(self.uuid) == states.SUCCESS: + return + kwargs = engine.storage.fetch_mapped_args(self._args_mapping) - engine.storage.save(self._id, result) - if isinstance(result, misc.Failure): - self.state = states.FAILURE + self._change_state(engine, states.RUNNING) + try: + result = self._task.execute(**kwargs) + except Exception: + failure = misc.Failure() + self._update_result(engine, states.FAILURE, failure) + failure.reraise() else: - self.state = states.SUCCESS - return self.state + self._update_result(engine, states.SUCCESS, result) def revert(self, engine): - if self.state == states.PENDING: # pragma: no cover + if engine.storage.get_task_state(self.uuid) == states.PENDING: # NOTE(imelnikov): in all the other states, the task # execution was at least attempted, so we should give # task a chance for cleanup return + kwargs = engine.storage.fetch_mapped_args(self._args_mapping) + self._change_state(engine, states.REVERTING) try: - self._task.revert(result=engine.storage.get(self._id)) + self._task.revert(result=engine.storage.get(self._id), + **kwargs) except Exception: - self.state = states.FAILURE - raise + with excutils.save_and_reraise_exception(): + self._change_state(engine, states.FAILURE) else: - engine.storage.reset(self._id) - self.state = states.PENDING + self._update_result(engine, states.PENDING) diff --git a/taskflow/persistence/flowdetail.py b/taskflow/persistence/flowdetail.py index 75f27f21..2cad3a2d 100644 --- a/taskflow/persistence/flowdetail.py +++ b/taskflow/persistence/flowdetail.py @@ -58,6 +58,12 @@ class FlowDetail(object): return self_td return None + def find_by_name(self, td_name): + for self_td in self: + if self_td.name == td_name: + return self_td + return None + def save(self): """Saves *most* of the components of this given object. diff --git a/taskflow/storage.py b/taskflow/storage.py index bf2920a6..5e2776d3 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -16,33 +16,175 @@ # License for the specific language governing permissions and limitations # under the License. - from taskflow import exceptions +from taskflow.openstack.common import uuidutils +from taskflow.persistence import flowdetail +from taskflow.persistence import logbook +from taskflow.persistence import taskdetail +from taskflow import states + + +def temporary_flow_detail(): + """Creates flow detail class for temporary usage + + Creates in-memory logbook and flow detail in it. Should + be useful for tests and other use cases where persistence + is not needed + """ + lb = logbook.LogBook('tmp', backend='memory') + fd = flowdetail.FlowDetail( + name='tmp', uuid=uuidutils.generate_uuid(), + backend='memory') + lb.add(fd) + lb.save() + fd.save() + return fd + + +STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE) class Storage(object): - """Manages task results""" + """Interface between engines and logbook - # TODO(imelnikov): this should be implemented on top of logbook + This class provides simple interface to save task details and + results to persistence layer for use by engines. + """ - def __init__(self): - self._task_results = {} + injector_name = '_TaskFlow_INJECTOR' - def save(self, uuid, data): + def __init__(self, flow_detail=None): + self._result_mappings = {} + self._reverse_mapping = {} + + if flow_detail is None: + # TODO(imelnikov): this is useful mainly for tests; + # maybe we should make flow_detail required parameter? + self._flowdetail = temporary_flow_detail() + else: + self._flowdetail = flow_detail + + def add_task(self, uuid, task_name): + """Add the task to storage + + Task becomes known to storage by that name and uuid. + Task state is set to PENDING. + """ + # TODO(imelnikov): check that task with same uuid or + # task name does not exist + td = taskdetail.TaskDetail(name=task_name, uuid=uuid) + td.state = states.PENDING + self._flowdetail.add(td) + self._flowdetail.save() + td.save() + + def get_uuid_by_name(self, task_name): + """Get uuid of task with given name""" + td = self._flowdetail.find_by_name(task_name) + if td is not None: + return td.uuid + else: + raise exceptions.NotFound("Unknown task name: %r" % task_name) + + def _taskdetail_by_uuid(self, uuid): + td = self._flowdetail.find(uuid) + if td is None: + raise exceptions.NotFound("Unknown task: %r" % uuid) + return td + + def set_task_state(self, uuid, state): + """Set task state""" + td = self._taskdetail_by_uuid(uuid) + td.state = state + td.save() + + def get_task_state(self, uuid): + """Get state of task with given uuid""" + return self._taskdetail_by_uuid(uuid).state + + def save(self, uuid, data, state=states.SUCCESS): """Put result for task with id 'uuid' to storage""" - self._task_results[uuid] = data + td = self._taskdetail_by_uuid(uuid) + td.state = state + td.results = data + td.save() def get(self, uuid): """Get result for task with id 'uuid' to storage""" - try: - return self._task_results[uuid] - except KeyError: - raise exceptions.NotFound("Result for task %r is not known" - % uuid) + td = self._taskdetail_by_uuid(uuid) + if td.state not in STATES_WITH_RESULTS: + raise exceptions.NotFound("Result for task %r is not known" % uuid) + return td.results - def reset(self, uuid): + def reset(self, uuid, state=states.PENDING): """Remove result for task with id 'uuid' from storage""" + td = self._taskdetail_by_uuid(uuid) + td.results = None + td.state = state + td.save() + + def inject(self, pairs): + """Add values into storage + + This method should be used by job in order to put flow parameters + into storage and put it to action. + """ + pairs = dict(pairs) + injector_uuid = uuidutils.generate_uuid() + self.add_task(injector_uuid, self.injector_name) + self.save(injector_uuid, pairs) + self._reverse_mapping.update((name, (injector_uuid, name)) + for name in pairs) + + def set_result_mapping(self, uuid, mapping): + """Set mapping for naming task results + + The result saved with given uuid would be accessible by names + defined in mapping. Mapping is a dict name => index. If index + is None, the whole result will have this name; else, only + part of it, result[index]. + """ + if not mapping: + return + self._result_mappings[uuid] = mapping + for name, index in mapping.iteritems(): + self._reverse_mapping[name] = (uuid, index) + + def fetch(self, name): + """Fetch named task result""" try: - del self._task_results[uuid] + uuid, index = self._reverse_mapping[name] except KeyError: - pass + raise exceptions.NotFound("Name %r is not mapped" % name) + result = self.get(uuid) + if index is None: + return result + else: + return result[index] + + def fetch_all(self): + """Fetch all named task results known so far + + Should be used for debugging and testing purposes mostly. + """ + result = {} + for name in self._reverse_mapping: + try: + result[name] = self.fetch(name) + except exceptions.NotFound: + pass + return result + + def fetch_mapped_args(self, args_mapping): + """Fetch arguments for the task using arguments mapping""" + return dict((key, self.fetch(name)) + for key, name in args_mapping.iteritems()) + + def set_flow_state(self, state): + """Set flowdetails state and save it""" + self._flowdetail.state = state + self._flowdetail.save() + + def get_flow_state(self): + """Set state from flowdetails""" + return self._flowdetail.state diff --git a/taskflow/task.py b/taskflow/task.py index 6b7dca69..34be3562 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -54,7 +54,7 @@ class BaseTask(object): return "%s==%s" % (self.name, misc.get_task_version(self)) @abc.abstractmethod - def execute(self, context, *args, **kwargs): + def execute(self, *args, **kwargs): """Activate a given task which will perform some operation and return. This method can be used to apply some given context and given set @@ -63,7 +63,7 @@ class BaseTask(object): back into this task if reverting is triggered. """ - def revert(self, context, result, cause): + def revert(self, *args, **kwargs): """Revert this task using the given context, result that the apply provided as well as any information which may have caused said reversion. diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 311da379..3f1c31ff 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -17,6 +17,10 @@ # under the License. from taskflow import blocks +from taskflow import exceptions +from taskflow.persistence import taskdetail +from taskflow import states +from taskflow import storage from taskflow import task from taskflow import test @@ -25,9 +29,12 @@ from taskflow.engines.action_engine import engine as eng class TestTask(task.Task): - def __init__(self, values, name): + def __init__(self, values=None, name=None): super(TestTask, self).__init__(name) - self.values = values + if values is None: + self.values = [] + else: + self.values = values def execute(self, **kwargs): self.values.append(self.name) @@ -39,6 +46,7 @@ class TestTask(task.Task): class FailingTask(TestTask): + def execute(self, **kwargs): raise RuntimeError('Woot!') @@ -59,12 +67,22 @@ class NastyTask(task.Task): raise RuntimeError('Gotcha!') +class MultiReturnTask(task.Task): + def execute(self, **kwargs): + return 12, 2, 1 + + +class MultiargsTask(task.Task): + def execute(self, a, b, c): + return a + b + c + + class EngineTestBase(object): def setUp(self): super(EngineTestBase, self).setUp() self.values = [] - def _make_engine(self, _flow): + def _make_engine(self, _flow, _flow_detail=None): raise NotImplementedError() @@ -75,7 +93,49 @@ class EngineTaskTest(EngineTestBase): engine = self._make_engine(flow) engine.run() self.assertEquals(self.values, ['task1']) - self.assertEquals(engine.storage.get(flow.uuid), 5) + + @staticmethod + def _callback(state, values, details): + name = details.get('task_name', '') + values.append('%s %s' % (name, state)) + + @staticmethod + def _flow_callback(state, values, details): + values.append('flow %s' % state) + + def test_run_task_with_notifications(self): + flow = blocks.Task(TestTask(self.values, name='task1')) + engine = self._make_engine(flow) + engine.notifier.register('*', self._flow_callback, + kwargs={'values': self.values}) + engine.task_notifier.register('*', self._callback, + kwargs={'values': self.values}) + engine.run() + self.assertEquals(self.values, + ['flow RUNNING', + 'task1 RUNNING', + 'task1', + 'task1 SUCCESS', + 'flow SUCCESS']) + + def test_failing_task_with_notifications(self): + flow = blocks.Task(FailingTask(self.values, 'fail')) + engine = self._make_engine(flow) + engine.notifier.register('*', self._flow_callback, + kwargs={'values': self.values}) + engine.task_notifier.register('*', self._callback, + kwargs={'values': self.values}) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() + self.assertEquals(self.values, + ['flow RUNNING', + 'fail RUNNING', + 'fail FAILURE', + 'flow REVERTING', + 'fail REVERTING', + 'fail reverted(Failure: RuntimeError: Woot!)', + 'fail PENDING', + 'flow REVERTED']) def test_invalid_block_raises(self): value = 'i am string, not block, sorry' @@ -84,6 +144,113 @@ class EngineTaskTest(EngineTestBase): self._make_engine(flow) self.assertIn(value, str(err.exception)) + def test_save_as(self): + flow = blocks.Task(TestTask(self.values, name='task1'), + save_as='first_data') + engine = self._make_engine(flow) + engine.run() + self.assertEquals(self.values, ['task1']) + self.assertEquals(engine.storage.fetch_all(), {'first_data': 5}) + + def test_save_all_in_one(self): + flow = blocks.Task(MultiReturnTask, save_as='all_data') + engine = self._make_engine(flow) + engine.run() + self.assertEquals(engine.storage.fetch_all(), + {'all_data': (12, 2, 1)}) + + def test_save_several_values(self): + flow = blocks.Task(MultiReturnTask, + save_as=('badger', 'mushroom', 'snake')) + engine = self._make_engine(flow) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'badger': 12, + 'mushroom': 2, + 'snake': 1 + }) + + def test_bad_save_as_value(self): + with self.assertRaises(TypeError): + blocks.Task(TestTask(name='task1'), + save_as=object()) + + def test_arguments_passing(self): + flow = blocks.Task(MultiargsTask, save_as='result') + engine = self._make_engine(flow) + engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'a': 1, 'b': 4, 'c': 9, 'x': 17, + 'result': 14, + }) + + def test_arguments_missing(self): + flow = blocks.Task(MultiargsTask, save_as='result') + engine = self._make_engine(flow) + engine.storage.inject({'a': 1, 'b': 4, 'x': 17}) + with self.assertRaisesRegexp(exceptions.NotFound, + "^Name 'c' is not mapped"): + engine.run() + + def test_partial_arguments_mapping(self): + flow = blocks.Task(MultiargsTask(name='task1'), + save_as='result', + rebind_args={'b': 'x'}) + engine = self._make_engine(flow) + engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'a': 1, 'b': 4, 'c': 9, 'x': 17, + 'result': 27, + }) + + def test_all_arguments_mapping(self): + flow = blocks.Task(MultiargsTask(name='task1'), + save_as='result', + rebind_args=['x', 'y', 'z']) + engine = self._make_engine(flow) + engine.storage.inject({ + 'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6 + }) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'a': 1, 'b': 2, 'c': 3, 'x': 4, 'y': 5, 'z': 6, + 'result': 15, + }) + + def test_not_enough_arguments_for_task(self): + msg = '^Task task1 takes 3 positional arguments' + with self.assertRaisesRegexp(ValueError, msg): + blocks.Task(MultiargsTask(name='task1'), + save_as='result', + rebind_args=['x', 'y']) + + def test_invalid_argument_name_map(self): + flow = blocks.Task(MultiargsTask(name='task1'), + save_as='result', + rebind_args={'b': 'z'}) + engine = self._make_engine(flow) + engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) + with self.assertRaisesRegexp(exceptions.NotFound, + "Name 'z' is not mapped"): + engine.run() + + def test_invalid_argument_name_list(self): + flow = blocks.Task(MultiargsTask(name='task1'), + save_as='result', + rebind_args=['a', 'z', 'b']) + engine = self._make_engine(flow) + engine.storage.inject({'a': 1, 'b': 4, 'c': 9, 'x': 17}) + with self.assertRaisesRegexp(exceptions.NotFound, + "Name 'z' is not mapped"): + engine.run() + + def test_bad_rebind_args_value(self): + with self.assertRaises(TypeError): + blocks.Task(TestTask(name='task1'), + rebind_args=object()) + class EngineLinearFlowTest(EngineTestBase): @@ -102,6 +269,17 @@ class EngineLinearFlowTest(EngineTestBase): self._make_engine(flow).run() self.assertEquals(self.values, ['task1', 'task2']) + def test_revert_removes_data(self): + flow = blocks.LinearFlow().add( + blocks.Task(TestTask, save_as='one'), + blocks.Task(MultiReturnTask, save_as=('a', 'b', 'c')), + blocks.Task(FailingTask(name='fail')) + ) + engine = self._make_engine(flow) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() + self.assertEquals(engine.storage.fetch_all(), {}) + def test_sequential_flow_nested_blocks(self): flow = blocks.LinearFlow().add( blocks.Task(TestTask(self.values, 'task1')), @@ -115,7 +293,7 @@ class EngineLinearFlowTest(EngineTestBase): def test_revert_exception_is_reraised(self): flow = blocks.LinearFlow().add( blocks.Task(NastyTask), - blocks.Task(FailingTask(self.values, 'fail')) + blocks.Task(FailingTask(name='fail')) ) engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Gotcha'): @@ -126,7 +304,9 @@ class EngineLinearFlowTest(EngineTestBase): blocks.Task(FailingTask(self.values, 'fail')), blocks.Task(NeverRunningTask) ) - self._make_engine(flow).run() + engine = self._make_engine(flow) + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() self.assertEquals(self.values, ['fail reverted(Failure: RuntimeError: Woot!)']) @@ -139,15 +319,37 @@ class EngineLinearFlowTest(EngineTestBase): ) ) engine = self._make_engine(flow) - engine.run() + with self.assertRaisesRegexp(RuntimeError, '^Woot'): + engine.run() self.assertEquals(self.values, ['task1', 'task2', 'fail reverted(Failure: RuntimeError: Woot!)', 'task2 reverted(5)', 'task1 reverted(5)']) + def test_sequential_flow_two_tasks_with_resumption(self): + flow = blocks.LinearFlow().add( + blocks.Task(TestTask(self.values, name='task1'), save_as='x1'), + blocks.Task(TestTask(self.values, name='task2'), save_as='x2') + ) + + # Create FlowDetail as if we already run task1 + fd = storage.temporary_flow_detail() + td = taskdetail.TaskDetail(name='task1', uuid='42') + td.state = states.SUCCESS + td.results = 17 + fd.add(td) + fd.save() + td.save() + + engine = self._make_engine(flow, fd) + engine.run() + self.assertEquals(self.values, ['task2']) + self.assertEquals(engine.storage.fetch_all(), + {'x1': 17, 'x2': 5}) + class SingleThreadedEngineTest(EngineTaskTest, EngineLinearFlowTest, test.TestCase): - def _make_engine(self, flow): - return eng.SingleThreadedActionEngine(flow) + def _make_engine(self, flow, flow_detail=None): + return eng.SingleThreadedActionEngine(flow, flow_detail=flow_detail) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index a51506b5..b0cf0e53 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -17,28 +17,147 @@ # under the License. from taskflow import exceptions +from taskflow import states from taskflow import storage from taskflow import test class StorageTest(test.TestCase): + + def test_add_task(self): + s = storage.Storage() + s.add_task('42', 'my task') + self.assertEquals(s.get_task_state('42'), states.PENDING) + def test_save_and_get(self): s = storage.Storage() + s.add_task('42', 'my task') s.save('42', 5) self.assertEquals(s.get('42'), 5) + self.assertEquals(s.fetch_all(), {}) + self.assertEquals(s.get_task_state('42'), states.SUCCESS) + + def test_save_and_get_other_state(self): + s = storage.Storage() + s.add_task('42', 'my task') + s.save('42', 5, states.FAILURE) + self.assertEquals(s.get('42'), 5) + self.assertEquals(s.get_task_state('42'), states.FAILURE) def test_get_non_existing_var(self): s = storage.Storage() + s.add_task('42', 'my task') with self.assertRaises(exceptions.NotFound): s.get('42') def test_reset(self): s = storage.Storage() + s.add_task('42', 'my task') s.save('42', 5) s.reset('42') + self.assertEquals(s.get_task_state('42'), states.PENDING) with self.assertRaises(exceptions.NotFound): s.get('42') def test_reset_unknown_task(self): s = storage.Storage() + s.add_task('42', 'my task') self.assertEquals(s.reset('42'), None) + + def test_fetch_by_name(self): + s = storage.Storage() + s.add_task('42', 'my task') + name = 'my result' + s.set_result_mapping('42', {name: None}) + s.save('42', 5) + self.assertEquals(s.fetch(name), 5) + self.assertEquals(s.fetch_all(), {name: 5}) + + def test_fetch_unknown_name(self): + s = storage.Storage() + with self.assertRaisesRegexp(exceptions.NotFound, + "^Name 'xxx' is not mapped"): + s.fetch('xxx') + + def test_fetch_result_not_ready(self): + s = storage.Storage() + s.add_task('42', 'my task') + name = 'my result' + s.set_result_mapping('42', {name: None}) + with self.assertRaises(exceptions.NotFound): + s.get(name) + self.assertEquals(s.fetch_all(), {}) + + def test_save_multiple_results(self): + s = storage.Storage() + s.add_task('42', 'my task') + s.set_result_mapping('42', {'foo': 0, 'bar': 1, 'whole': None}) + s.save('42', ('spam', 'eggs')) + self.assertEquals(s.fetch_all(), { + 'foo': 'spam', + 'bar': 'eggs', + 'whole': ('spam', 'eggs') + }) + + def test_mapping_none(self): + s = storage.Storage() + s.add_task('42', 'my task') + s.set_result_mapping('42', None) + s.save('42', 5) + self.assertEquals(s.fetch_all(), {}) + + def test_inject(self): + s = storage.Storage() + s.inject({'foo': 'bar', 'spam': 'eggs'}) + self.assertEquals(s.fetch('spam'), 'eggs') + self.assertEquals(s.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + + def test_fetch_meapped_args(self): + s = storage.Storage() + s.inject({'foo': 'bar', 'spam': 'eggs'}) + self.assertEquals(s.fetch_mapped_args({'viking': 'spam'}), + {'viking': 'eggs'}) + + def test_fetch_not_found_args(self): + s = storage.Storage() + s.inject({'foo': 'bar', 'spam': 'eggs'}) + with self.assertRaises(exceptions.NotFound): + s.fetch_mapped_args({'viking': 'helmet'}) + + def test_set_and_get_task_state(self): + s = storage.Storage() + state = states.PENDING + s.add_task('42', 'my task') + s.set_task_state('42', state) + self.assertEquals(s.get_task_state('42'), state) + + def test_get_state_of_unknown_task(self): + s = storage.Storage() + with self.assertRaisesRegexp(exceptions.NotFound, '^Unknown'): + s.get_task_state('42') + + def test_task_by_name(self): + s = storage.Storage() + s.add_task('42', 'my task') + self.assertEquals(s.get_uuid_by_name('my task'), '42') + + def test_unknown_task_by_name(self): + s = storage.Storage() + with self.assertRaisesRegexp(exceptions.NotFound, + '^Unknown task name:'): + s.get_uuid_by_name('42') + + def test_get_flow_state(self): + fd = storage.temporary_flow_detail() + fd.state = states.INTERRUPTED + fd.save() + s = storage.Storage(fd) + self.assertEquals(s.get_flow_state(), states.INTERRUPTED) + + def test_set_and_get_flow_state(self): + s = storage.Storage() + s.set_flow_state(states.SUCCESS) + self.assertEquals(s.get_flow_state(), states.SUCCESS)