From 8218f57ad2871bc1050dc7e6ecd57af14be0c8c4 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 6 Sep 2013 19:03:30 -0700 Subject: [PATCH] Allow provides to be a set and results to be a dictionary Instead of forcing task provides to be a tuple or list and using the corresponding numerical index to determine how to interpret the tasks results we should also allow for the task provides to be a set and then allow for the tasks result to be interpreted as a dictionary. Fixes bug 1221998 Change-Id: Ibae689e7975d6782aa248d2b6e3691c44a89ef9f --- taskflow/engines/action_engine/task_action.py | 4 +++ taskflow/storage.py | 10 +++++++ taskflow/task.py | 17 ++++++++++- taskflow/tests/unit/test_action_engine.py | 18 ++++++++++++ taskflow/tests/unit/test_linear_flow.py | 28 +++++++++++++++++-- taskflow/tests/utils.py | 23 +++++++++++---- 6 files changed, 91 insertions(+), 9 deletions(-) diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 1a8282bb..80be36b3 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -33,6 +33,10 @@ class TaskAction(base.Action): try: self._id = engine.storage.get_uuid_by_name(self._task.name) except exceptions.NotFound: + # TODO(harlowja): we might need to save whether the results of this + # task will be a tuple + other additional metadata when doing this + # add to the underlying storage backend for later resumption of + # this task. self._id = uuidutils.generate_uuid() engine.storage.add_task(task_name=self.name, uuid=self.uuid) engine.storage.set_result_mapping(self.uuid, self._result_mapping) diff --git a/taskflow/storage.py b/taskflow/storage.py index 95d739ef..5b8acdf8 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -168,6 +168,16 @@ class Storage(object): else: return result[index] except exceptions.NotFound: + # NOTE(harlowja): No result was found for the given uuid. + pass + except (KeyError, IndexError, TypeError): + # NOTE(harlowja): The result that the uuid returned can not be + # accessed in the manner that the index is requesting. Perhaps + # the result is a dictionary-like object and that key does + # not exist (key error), or the result is a tuple/list and a + # non-numeric key is being requested (index error), or there + # was no result and an attempt to index into None is being + # requested (type error). pass raise exceptions.NotFound("Unable to find result %r" % name) diff --git a/taskflow/task.py b/taskflow/task.py index 21731839..20db472f 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -28,14 +28,29 @@ def _save_as_to_mapping(save_as): Result should follow storage convention for mappings. """ + # TODO(harlowja): we should probably document this behavior & convention + # outside of code so that its more easily understandable, since what a task + # returns is pretty crucial for other later operations. if save_as is None: return {} if isinstance(save_as, basestring): + # NOTE(harlowja): this means that your task will only return one item + # instead of a dictionary-like object or a indexable object (like a + # list or tuple). return {save_as: None} elif isinstance(save_as, (tuple, list)): + # NOTE(harlowja): this means that your task will return a indexable + # object, like a list or tuple and the results can be mapped by index + # to that tuple/list that is returned for others to use. return dict((key, num) for num, key in enumerate(save_as)) + elif isinstance(save_as, set): + # NOTE(harlowja): in the case where a set is given we will not be + # able to determine the numeric ordering in a reliable way (since it is + # a unordered set) so the only way for us to easily map the result of + # the task will be via the key itself. + return dict((key, key) for key in save_as) raise TypeError('Task provides parameter ' - 'should be str or tuple/list, not %r' % save_as) + 'should be str, set or tuple/list, not %r' % save_as) def _build_rebind_dict(args, rebind_args): diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index c6db63e5..538c826c 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -91,6 +91,14 @@ class MultiargsTask(task.Task): return a + b + c +class MultiDictTask(task.Task): + def execute(self): + output = {} + for i, k in enumerate(sorted(self.provides.keys())): + output[k] = i + return output + + class EngineTestBase(object): def setUp(self): super(EngineTestBase, self).setUp() @@ -186,6 +194,16 @@ class EngineTaskTest(EngineTestBase): 'snake': 1 }) + def test_save_dict(self): + flow = MultiDictTask(provides=set(['badger', 'mushroom', 'snake'])) + engine = self._make_engine(flow) + engine.run() + self.assertEquals(engine.storage.fetch_all(), { + 'badger': 0, + 'mushroom': 1, + 'snake': 2, + }) + def test_bad_save_as_value(self): with self.assertRaises(TypeError): TestTask(name='task1', provides=object()) diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py index ba5b80c9..21715e36 100644 --- a/taskflow/tests/unit/test_linear_flow.py +++ b/taskflow/tests/unit/test_linear_flow.py @@ -242,10 +242,30 @@ class LinearFlowTest(test.TestCase): e = _make_engine(wf) self.assertRaises(exc.NotFound, e.run) - def test_flow_good_order(self): + def test_flow_set_order(self): wf = lw.Flow("the-test-action") wf.add(utils.ProvidesRequiresTask('test-1', - requires=set(), + requires=[], + provides=set(['a', 'b']))) + wf.add(utils.ProvidesRequiresTask('test-2', + requires=set(['a', 'b']), + provides=set([]))) + e = _make_engine(wf) + e.run() + run_context = e.storage.fetch('context') + ordering = run_context[utils.ORDER_KEY] + self.assertEquals(2, len(ordering)) + self.assertEquals('test-1', ordering[0]['name']) + self.assertEquals('test-2', ordering[1]['name']) + self.assertEquals({'a': 'a', 'b': 'b'}, + ordering[1][utils.KWARGS_KEY]) + self.assertEquals({}, + ordering[0][utils.KWARGS_KEY]) + + def test_flow_list_order(self): + wf = lw.Flow("the-test-action") + wf.add(utils.ProvidesRequiresTask('test-1', + requires=[], provides=['a', 'b'])) wf.add(utils.ProvidesRequiresTask('test-2', requires=['a', 'b'], @@ -265,3 +285,7 @@ class LinearFlowTest(test.TestCase): e = _make_engine(wf) e.run() + run_context = e.storage.fetch('context') + ordering = run_context[utils.ORDER_KEY] + for i, entry in enumerate(ordering): + self.assertEquals('test-%s' % (i + 1), entry['name']) diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 98c64e5b..e34ff6a5 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -40,19 +40,30 @@ def drain(lst): class ProvidesRequiresTask(task.Task): - def __init__(self, name, provides, requires): + def __init__(self, name, provides, requires, return_tuple=True): super(ProvidesRequiresTask, self).__init__(name=name, provides=provides, requires=requires) + self.return_tuple = isinstance(provides, (tuple, list)) def execute(self, context, *args, **kwargs): if ORDER_KEY not in context: context[ORDER_KEY] = [] - context[ORDER_KEY].append(self.name) - outs = [] - for i in xrange(0, len(self.provides)): - outs.append(i) - return outs + context[ORDER_KEY].append({ + 'name': self.name, + KWARGS_KEY: kwargs, + ARGS_KEY: args, + }) + if self.return_tuple: + outs = [] + for i in xrange(0, len(self.provides)): + outs.append(i) + return tuple(outs) + else: + outs = {} + for k in self.provides.keys(): + outs[k] = k + return outs class DummyTask(task.Task):