diff --git a/doc/source/examples.rst b/doc/source/examples.rst index d30bd85f9..f1ebdc7eb 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -94,6 +94,18 @@ Watching execution timing :linenos: :lines: 16- +Distance calculator +=================== + +.. note:: + + Full source located at :example:`distance_calculator` + +.. literalinclude:: ../../taskflow/examples/distance_calculator.py + :language: python + :linenos: + :lines: 16- + Table multiplier (in parallel) ============================== diff --git a/taskflow/examples/distance_calculator.py b/taskflow/examples/distance_calculator.py new file mode 100644 index 000000000..5112dfb5a --- /dev/null +++ b/taskflow/examples/distance_calculator.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import math +import os +import sys + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.patterns import linear_flow +from taskflow import task + +# INTRO: This shows how to use a tasks/atoms ability to take requirements from +# its execute functions default parameters and shows how to provide those +# via different methods when needed, to influence those parameters to in +# this case calculate the distance between two points in 2D space. + +# A 2D point. +Point = collections.namedtuple("Point", "x,y") + + +def is_near(val, expected, tolerance=0.001): + # Floats don't really provide equality... + if val > (expected + tolerance): + return False + if val < (expected - tolerance): + return False + return True + + +class DistanceTask(task.Task): + # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space + + default_provides = 'distance' + + def execute(self, a=Point(0, 0), b=Point(0, 0)): + return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2)) + + +if __name__ == '__main__': + # For these we rely on the execute() methods points by default being + # at the origin (and we override it with store values when we want) at + # execution time (which then influences what is calculated). + any_distance = linear_flow.Flow("origin").add(DistanceTask()) + results = engines.run(any_distance) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 0.0, + is_near(results['distance'], 0.0))) + + results = engines.run(any_distance, store={'a': Point(1, 1)}) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 1.4142, + is_near(results['distance'], + 1.4142))) + + results = engines.run(any_distance, store={'a': Point(10, 10)}) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 14.14199, + is_near(results['distance'], + 14.14199))) + + results = engines.run(any_distance, + store={'a': Point(5, 5), 'b': Point(10, 10)}) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 7.07106, + is_near(results['distance'], + 7.07106))) + + # For this we use the ability to override at task creation time the + # optional arguments so that we don't need to continue to send them + # in via the 'store' argument like in the above (and we fix the new + # starting point 'a' at (10, 10) instead of (0, 0)... + + ten_distance = linear_flow.Flow("ten") + ten_distance.add(DistanceTask(inject={'a': Point(10, 10)})) + results = engines.run(ten_distance, store={'b': Point(10, 10)}) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 0.0, + is_near(results['distance'], 0.0))) + + results = engines.run(ten_distance) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 14.14199, + is_near(results['distance'], + 14.14199))) diff --git a/taskflow/examples/optional_arguments.py b/taskflow/examples/optional_arguments.py deleted file mode 100644 index 66a4d380d..000000000 --- a/taskflow/examples/optional_arguments.py +++ /dev/null @@ -1,93 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (C) 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import unittest - -from taskflow import engines -from taskflow.patterns import linear_flow -from taskflow import task - - -class TestTask(task.Task): - def execute(self, a, b=5): - result = a * b - return result - -flow_no_inject = linear_flow.Flow("flow").add(TestTask(provides='result')) -flow_inject_a = linear_flow.Flow("flow").add(TestTask(provides='result', - inject={'a': 10})) -flow_inject_b = linear_flow.Flow("flow").add(TestTask(provides='result', - inject={'b': 1000})) - -ASSERT = True - - -class MyTest(unittest.TestCase): - def test_my_test(self): - print("Expected result = 15") - result = engines.run(flow_no_inject, store={'a': 3}) - print(result) - if ASSERT: - self.assertEqual(result, - {'a': 3, 'result': 15} - ) - - print("Expected result = 39") - result = engines.run(flow_no_inject, store={'a': 3, 'b': 7}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'b': 7, 'result': 21} - ) - - print("Expected result = 200") - result = engines.run(flow_inject_a, store={'a': 3}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'result': 50} - ) - - print("Expected result = 400") - result = engines.run(flow_inject_a, store={'a': 3, 'b': 7}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'b': 7, 'result': 70} - ) - - print("Expected result = 40") - result = engines.run(flow_inject_b, store={'a': 3}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'result': 3000} - ) - - print("Expected result = 40") - result = engines.run(flow_inject_b, store={'a': 3, 'b': 7}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'b': 7, 'result': 3000} - ) - -if __name__ == '__main__': - unittest.main() diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 8762d386d..9176b9d67 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -104,6 +104,51 @@ class EngineTaskTest(object): self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) +class EngineOptionalRequirementsTest(utils.EngineTestBase): + def test_expected_optional_multiplers(self): + flow_no_inject = lf.Flow("flow") + flow_no_inject.add(utils.OptionalTask(provides='result')) + + flow_inject_a = lf.Flow("flow") + flow_inject_a.add(utils.OptionalTask(provides='result', + inject={'a': 10})) + + flow_inject_b = lf.Flow("flow") + flow_inject_b.add(utils.OptionalTask(provides='result', + inject={'b': 1000})) + + engine = self._make_engine(flow_no_inject, store={'a': 3}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'result': 15}) + + engine = self._make_engine(flow_no_inject, + store={'a': 3, 'b': 7}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'b': 7, 'result': 21}) + + engine = self._make_engine(flow_inject_a, store={'a': 3}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'result': 50}) + + engine = self._make_engine(flow_inject_a, store={'a': 3, 'b': 7}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'b': 7, 'result': 70}) + + engine = self._make_engine(flow_inject_b, store={'a': 3}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'result': 3000}) + + engine = self._make_engine(flow_inject_b, store={'a': 3, 'b': 7}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'b': 7, 'result': 3000}) + + class EngineLinearFlowTest(utils.EngineTestBase): def test_run_empty_flow(self): @@ -601,14 +646,17 @@ class SerialEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineCheckingTaskTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None): + def _make_engine(self, flow, + flow_detail=None, store=None): return taskflow.engines.load(flow, flow_detail=flow_detail, engine='serial', - backend=self.backend) + backend=self.backend, + store=store) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) @@ -623,18 +671,21 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineCheckingTaskTest, test.TestCase): _EXECUTOR_WORKERS = 2 - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, + flow_detail=None, executor=None, store=None): if executor is None: executor = 'threads' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, executor=executor, engine='parallel', + store=store, max_workers=self._EXECUTOR_WORKERS) def test_correct_load(self): @@ -657,23 +708,27 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineCheckingTaskTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, + flow_detail=None, executor=None, store=None): if executor is None: executor = futures.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', - executor=executor) + executor=executor, + store=store) class ParallelEngineWithProcessTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -682,13 +737,15 @@ class ParallelEngineWithProcessTest(EngineTaskTest, engine = self._make_engine(utils.TaskNoRequiresNoReturns) self.assertIsInstance(engine, eng.ParallelActionEngine) - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, + flow_detail=None, executor=None, store=None): if executor is None: executor = 'processes' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', executor=executor, + store=store, max_workers=self._EXECUTOR_WORKERS) @@ -696,6 +753,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, test.TestCase): def setUp(self): @@ -740,9 +798,11 @@ class WorkerBasedEngineTest(EngineTaskTest, self.worker_thread.join() super(WorkerBasedEngineTest, self).tearDown() - def _make_engine(self, flow, flow_detail=None): + def _make_engine(self, flow, + flow_detail=None, store=None): return taskflow.engines.load(flow, flow_detail=flow_detail, - backend=self.backend, **self.engine_conf) + backend=self.backend, + store=store, **self.engine_conf) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index cc4578c05..7020a931b 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -34,7 +34,7 @@ class TestWorker(test.MockTestCase): self.exchange = 'test-exchange' self.topic = 'test-topic' self.threads_count = 5 - self.endpoint_count = 21 + self.endpoint_count = 22 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 2594798f7..fbbb83c77 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -157,6 +157,12 @@ class FailingTask(ProgressingTask): raise RuntimeError('Woot!') +class OptionalTask(task.Task): + def execute(self, a, b=5): + result = a * b + return result + + class TaskWithFailure(task.Task): def execute(self, **kwargs):