Merge "Allow provides to be a set and results to be a dictionary"
This commit is contained in:
@@ -33,6 +33,10 @@ class TaskAction(base.Action):
|
||||
try:
|
||||
self._id = engine.storage.get_uuid_by_name(self._task.name)
|
||||
except exceptions.NotFound:
|
||||
# TODO(harlowja): we might need to save whether the results of this
|
||||
# task will be a tuple + other additional metadata when doing this
|
||||
# add to the underlying storage backend for later resumption of
|
||||
# this task.
|
||||
self._id = uuidutils.generate_uuid()
|
||||
engine.storage.add_task(task_name=self.name, uuid=self.uuid)
|
||||
engine.storage.set_result_mapping(self.uuid, self._result_mapping)
|
||||
|
||||
@@ -168,6 +168,16 @@ class Storage(object):
|
||||
else:
|
||||
return result[index]
|
||||
except exceptions.NotFound:
|
||||
# NOTE(harlowja): No result was found for the given uuid.
|
||||
pass
|
||||
except (KeyError, IndexError, TypeError):
|
||||
# NOTE(harlowja): The result that the uuid returned can not be
|
||||
# accessed in the manner that the index is requesting. Perhaps
|
||||
# the result is a dictionary-like object and that key does
|
||||
# not exist (key error), or the result is a tuple/list and a
|
||||
# non-numeric key is being requested (index error), or there
|
||||
# was no result and an attempt to index into None is being
|
||||
# requested (type error).
|
||||
pass
|
||||
raise exceptions.NotFound("Unable to find result %r" % name)
|
||||
|
||||
|
||||
@@ -28,14 +28,29 @@ def _save_as_to_mapping(save_as):
|
||||
|
||||
Result should follow storage convention for mappings.
|
||||
"""
|
||||
# TODO(harlowja): we should probably document this behavior & convention
|
||||
# outside of code so that its more easily understandable, since what a task
|
||||
# returns is pretty crucial for other later operations.
|
||||
if save_as is None:
|
||||
return {}
|
||||
if isinstance(save_as, basestring):
|
||||
# NOTE(harlowja): this means that your task will only return one item
|
||||
# instead of a dictionary-like object or a indexable object (like a
|
||||
# list or tuple).
|
||||
return {save_as: None}
|
||||
elif isinstance(save_as, (tuple, list)):
|
||||
# NOTE(harlowja): this means that your task will return a indexable
|
||||
# object, like a list or tuple and the results can be mapped by index
|
||||
# to that tuple/list that is returned for others to use.
|
||||
return dict((key, num) for num, key in enumerate(save_as))
|
||||
elif isinstance(save_as, set):
|
||||
# NOTE(harlowja): in the case where a set is given we will not be
|
||||
# able to determine the numeric ordering in a reliable way (since it is
|
||||
# a unordered set) so the only way for us to easily map the result of
|
||||
# the task will be via the key itself.
|
||||
return dict((key, key) for key in save_as)
|
||||
raise TypeError('Task provides parameter '
|
||||
'should be str or tuple/list, not %r' % save_as)
|
||||
'should be str, set or tuple/list, not %r' % save_as)
|
||||
|
||||
|
||||
def _build_rebind_dict(args, rebind_args):
|
||||
|
||||
@@ -91,6 +91,14 @@ class MultiargsTask(task.Task):
|
||||
return a + b + c
|
||||
|
||||
|
||||
class MultiDictTask(task.Task):
|
||||
def execute(self):
|
||||
output = {}
|
||||
for i, k in enumerate(sorted(self.provides.keys())):
|
||||
output[k] = i
|
||||
return output
|
||||
|
||||
|
||||
class EngineTestBase(object):
|
||||
def setUp(self):
|
||||
super(EngineTestBase, self).setUp()
|
||||
@@ -186,6 +194,16 @@ class EngineTaskTest(EngineTestBase):
|
||||
'snake': 1
|
||||
})
|
||||
|
||||
def test_save_dict(self):
|
||||
flow = MultiDictTask(provides=set(['badger', 'mushroom', 'snake']))
|
||||
engine = self._make_engine(flow)
|
||||
engine.run()
|
||||
self.assertEquals(engine.storage.fetch_all(), {
|
||||
'badger': 0,
|
||||
'mushroom': 1,
|
||||
'snake': 2,
|
||||
})
|
||||
|
||||
def test_bad_save_as_value(self):
|
||||
with self.assertRaises(TypeError):
|
||||
TestTask(name='task1', provides=object())
|
||||
|
||||
@@ -242,10 +242,30 @@ class LinearFlowTest(test.TestCase):
|
||||
e = _make_engine(wf)
|
||||
self.assertRaises(exc.NotFound, e.run)
|
||||
|
||||
def test_flow_good_order(self):
|
||||
def test_flow_set_order(self):
|
||||
wf = lw.Flow("the-test-action")
|
||||
wf.add(utils.ProvidesRequiresTask('test-1',
|
||||
requires=set(),
|
||||
requires=[],
|
||||
provides=set(['a', 'b'])))
|
||||
wf.add(utils.ProvidesRequiresTask('test-2',
|
||||
requires=set(['a', 'b']),
|
||||
provides=set([])))
|
||||
e = _make_engine(wf)
|
||||
e.run()
|
||||
run_context = e.storage.fetch('context')
|
||||
ordering = run_context[utils.ORDER_KEY]
|
||||
self.assertEquals(2, len(ordering))
|
||||
self.assertEquals('test-1', ordering[0]['name'])
|
||||
self.assertEquals('test-2', ordering[1]['name'])
|
||||
self.assertEquals({'a': 'a', 'b': 'b'},
|
||||
ordering[1][utils.KWARGS_KEY])
|
||||
self.assertEquals({},
|
||||
ordering[0][utils.KWARGS_KEY])
|
||||
|
||||
def test_flow_list_order(self):
|
||||
wf = lw.Flow("the-test-action")
|
||||
wf.add(utils.ProvidesRequiresTask('test-1',
|
||||
requires=[],
|
||||
provides=['a', 'b']))
|
||||
wf.add(utils.ProvidesRequiresTask('test-2',
|
||||
requires=['a', 'b'],
|
||||
@@ -265,3 +285,7 @@ class LinearFlowTest(test.TestCase):
|
||||
|
||||
e = _make_engine(wf)
|
||||
e.run()
|
||||
run_context = e.storage.fetch('context')
|
||||
ordering = run_context[utils.ORDER_KEY]
|
||||
for i, entry in enumerate(ordering):
|
||||
self.assertEquals('test-%s' % (i + 1), entry['name'])
|
||||
|
||||
@@ -40,19 +40,30 @@ def drain(lst):
|
||||
|
||||
|
||||
class ProvidesRequiresTask(task.Task):
|
||||
def __init__(self, name, provides, requires):
|
||||
def __init__(self, name, provides, requires, return_tuple=True):
|
||||
super(ProvidesRequiresTask, self).__init__(name=name,
|
||||
provides=provides,
|
||||
requires=requires)
|
||||
self.return_tuple = isinstance(provides, (tuple, list))
|
||||
|
||||
def execute(self, context, *args, **kwargs):
|
||||
if ORDER_KEY not in context:
|
||||
context[ORDER_KEY] = []
|
||||
context[ORDER_KEY].append(self.name)
|
||||
outs = []
|
||||
for i in xrange(0, len(self.provides)):
|
||||
outs.append(i)
|
||||
return outs
|
||||
context[ORDER_KEY].append({
|
||||
'name': self.name,
|
||||
KWARGS_KEY: kwargs,
|
||||
ARGS_KEY: args,
|
||||
})
|
||||
if self.return_tuple:
|
||||
outs = []
|
||||
for i in xrange(0, len(self.provides)):
|
||||
outs.append(i)
|
||||
return tuple(outs)
|
||||
else:
|
||||
outs = {}
|
||||
for k in self.provides.keys():
|
||||
outs[k] = k
|
||||
return outs
|
||||
|
||||
|
||||
class DummyTask(task.Task):
|
||||
|
||||
Reference in New Issue
Block a user