Merge "Allow same deps for requires and provides in task"
This commit is contained in:
@@ -28,7 +28,6 @@ from oslo_utils import reflection
|
||||
import six
|
||||
from six.moves import zip as compat_zip
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow.types import sets
|
||||
from taskflow.utils import misc
|
||||
|
||||
@@ -162,10 +161,6 @@ class Atom(object):
|
||||
some action that furthers the overall flows progress. It usually also
|
||||
produces some of its own named output as a result of this process.
|
||||
|
||||
NOTE(harlowja): there can be no intersection between what this atom
|
||||
requires and what it produces (since this would be an impossible
|
||||
dependency to satisfy).
|
||||
|
||||
:param name: Meaningful name for this atom, should be something that is
|
||||
distinguishable and understandable for notification,
|
||||
debugging, storing and any other similar purposes.
|
||||
@@ -229,12 +224,6 @@ class Atom(object):
|
||||
inject_keys = frozenset(six.iterkeys(self.inject))
|
||||
self.requires -= inject_keys
|
||||
self.optional -= inject_keys
|
||||
out_of_order = self.provides.intersection(self.requires)
|
||||
if out_of_order:
|
||||
raise exceptions.DependencyFailure(
|
||||
"Atom %(item)s provides %(oo)s that are required "
|
||||
"by this atom"
|
||||
% dict(item=self.name, oo=sorted(out_of_order)))
|
||||
|
||||
@abc.abstractmethod
|
||||
def execute(self, *args, **kwargs):
|
||||
|
||||
@@ -149,6 +149,53 @@ class EngineOptionalRequirementsTest(utils.EngineTestBase):
|
||||
self.assertEqual(result, {'a': 3, 'b': 7, 'result': 3000})
|
||||
|
||||
|
||||
class EngineMultipleResultsTest(utils.EngineTestBase):
|
||||
def test_fetch_with_a_single_result(self):
|
||||
flow = lf.Flow("flow")
|
||||
flow.add(utils.TaskOneReturn(provides='x'))
|
||||
|
||||
engine = self._make_engine(flow)
|
||||
engine.run()
|
||||
result = engine.storage.fetch('x')
|
||||
self.assertEqual(result, 1)
|
||||
|
||||
def test_fetch_with_two_results(self):
|
||||
flow = lf.Flow("flow")
|
||||
flow.add(utils.TaskOneReturn(provides='x'))
|
||||
|
||||
engine = self._make_engine(flow, store={'x': 0})
|
||||
engine.run()
|
||||
result = engine.storage.fetch('x')
|
||||
self.assertEqual(result, 0)
|
||||
|
||||
def test_fetch_all_with_a_single_result(self):
|
||||
flow = lf.Flow("flow")
|
||||
flow.add(utils.TaskOneReturn(provides='x'))
|
||||
|
||||
engine = self._make_engine(flow)
|
||||
engine.run()
|
||||
result = engine.storage.fetch_all()
|
||||
self.assertEqual(result, {'x': 1})
|
||||
|
||||
def test_fetch_all_with_two_results(self):
|
||||
flow = lf.Flow("flow")
|
||||
flow.add(utils.TaskOneReturn(provides='x'))
|
||||
|
||||
engine = self._make_engine(flow, store={'x': 0})
|
||||
engine.run()
|
||||
result = engine.storage.fetch_all()
|
||||
self.assertEqual(result, {'x': [0, 1]})
|
||||
|
||||
def test_task_can_update_value(self):
|
||||
flow = lf.Flow("flow")
|
||||
flow.add(utils.TaskOneArgOneReturn(requires='x', provides='x'))
|
||||
|
||||
engine = self._make_engine(flow, store={'x': 0})
|
||||
engine.run()
|
||||
result = engine.storage.fetch_all()
|
||||
self.assertEqual(result, {'x': [0, 1]})
|
||||
|
||||
|
||||
class EngineLinearFlowTest(utils.EngineTestBase):
|
||||
|
||||
def test_run_empty_flow(self):
|
||||
@@ -697,6 +744,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase):
|
||||
|
||||
|
||||
class SerialEngineTest(EngineTaskTest,
|
||||
EngineMultipleResultsTest,
|
||||
EngineLinearFlowTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
@@ -723,6 +771,7 @@ class SerialEngineTest(EngineTaskTest,
|
||||
|
||||
|
||||
class ParallelEngineWithThreadsTest(EngineTaskTest,
|
||||
EngineMultipleResultsTest,
|
||||
EngineLinearFlowTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
@@ -761,6 +810,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
|
||||
|
||||
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
|
||||
class ParallelEngineWithEventletTest(EngineTaskTest,
|
||||
EngineMultipleResultsTest,
|
||||
EngineLinearFlowTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
@@ -782,6 +832,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
|
||||
|
||||
|
||||
class ParallelEngineWithProcessTest(EngineTaskTest,
|
||||
EngineMultipleResultsTest,
|
||||
EngineLinearFlowTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
@@ -808,6 +859,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
|
||||
|
||||
|
||||
class WorkerBasedEngineTest(EngineTaskTest,
|
||||
EngineMultipleResultsTest,
|
||||
EngineLinearFlowTest,
|
||||
EngineParallelFlowTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
|
||||
@@ -243,10 +243,10 @@ class FlowDependenciesTest(test.TestCase):
|
||||
requires=['a']))
|
||||
|
||||
def test_task_requires_and_provides_same_values(self):
|
||||
self.assertRaises(exceptions.DependencyFailure,
|
||||
utils.TaskOneArgOneReturn,
|
||||
requires='a',
|
||||
provides='a')
|
||||
flow = lf.Flow('lf', utils.TaskOneArgOneReturn('rt', requires='x',
|
||||
provides='x'))
|
||||
self.assertEqual(flow.requires, set('x'))
|
||||
self.assertEqual(flow.provides, set('x'))
|
||||
|
||||
def test_retry_in_linear_flow_no_requirements_no_provides(self):
|
||||
flow = lf.Flow('lf', retry.AlwaysRevert('rt'))
|
||||
@@ -271,9 +271,11 @@ class FlowDependenciesTest(test.TestCase):
|
||||
self.assertEqual(flow.provides, set(['a', 'b']))
|
||||
|
||||
def test_retry_requires_and_provides_same_value(self):
|
||||
self.assertRaises(exceptions.DependencyFailure,
|
||||
retry.AlwaysRevert,
|
||||
'rt', requires=['x', 'y'], provides=['x', 'y'])
|
||||
flow = lf.Flow('lf', retry.AlwaysRevert('rt',
|
||||
requires=['x', 'y'],
|
||||
provides=['x', 'y']))
|
||||
self.assertEqual(flow.requires, set(['x', 'y']))
|
||||
self.assertEqual(flow.provides, set(['x', 'y']))
|
||||
|
||||
def test_retry_in_unordered_flow_no_requirements_no_provides(self):
|
||||
flow = uf.Flow('uf', retry.AlwaysRevert('rt'))
|
||||
|
||||
Reference in New Issue
Block a user