From 4da581c168166f64ee076d32038a760d74bd2afa Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 11 Feb 2015 14:44:56 -0800 Subject: [PATCH] Improve upon/adjust/move around new optional example Instead of having the optional requirements example be a example that is itself a unittest just move the example to be an actual unit test that gets tested using the various engine types and change the example to be something slightly different (but shows the same kind of usage information). Change-Id: Ia03a81a6be636c501a35e7e290f587f7d05f8b30 --- doc/source/examples.rst | 12 ++ taskflow/examples/distance_calculator.py | 109 ++++++++++++++++++ taskflow/examples/optional_arguments.py | 93 --------------- taskflow/tests/unit/test_engines.py | 76 ++++++++++-- .../tests/unit/worker_based/test_worker.py | 2 +- taskflow/tests/utils.py | 6 + 6 files changed, 196 insertions(+), 102 deletions(-) create mode 100644 taskflow/examples/distance_calculator.py delete mode 100644 taskflow/examples/optional_arguments.py diff --git a/doc/source/examples.rst b/doc/source/examples.rst index d30bd85f9..f1ebdc7eb 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -94,6 +94,18 @@ Watching execution timing :linenos: :lines: 16- +Distance calculator +=================== + +.. note:: + + Full source located at :example:`distance_calculator` + +.. literalinclude:: ../../taskflow/examples/distance_calculator.py + :language: python + :linenos: + :lines: 16- + Table multiplier (in parallel) ============================== diff --git a/taskflow/examples/distance_calculator.py b/taskflow/examples/distance_calculator.py new file mode 100644 index 000000000..5112dfb5a --- /dev/null +++ b/taskflow/examples/distance_calculator.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import math +import os +import sys + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.patterns import linear_flow +from taskflow import task + +# INTRO: This shows how to use a tasks/atoms ability to take requirements from +# its execute functions default parameters and shows how to provide those +# via different methods when needed, to influence those parameters to in +# this case calculate the distance between two points in 2D space. + +# A 2D point. +Point = collections.namedtuple("Point", "x,y") + + +def is_near(val, expected, tolerance=0.001): + # Floats don't really provide equality... + if val > (expected + tolerance): + return False + if val < (expected - tolerance): + return False + return True + + +class DistanceTask(task.Task): + # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space + + default_provides = 'distance' + + def execute(self, a=Point(0, 0), b=Point(0, 0)): + return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2)) + + +if __name__ == '__main__': + # For these we rely on the execute() methods points by default being + # at the origin (and we override it with store values when we want) at + # execution time (which then influences what is calculated). + any_distance = linear_flow.Flow("origin").add(DistanceTask()) + results = engines.run(any_distance) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 0.0, + is_near(results['distance'], 0.0))) + + results = engines.run(any_distance, store={'a': Point(1, 1)}) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 1.4142, + is_near(results['distance'], + 1.4142))) + + results = engines.run(any_distance, store={'a': Point(10, 10)}) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 14.14199, + is_near(results['distance'], + 14.14199))) + + results = engines.run(any_distance, + store={'a': Point(5, 5), 'b': Point(10, 10)}) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 7.07106, + is_near(results['distance'], + 7.07106))) + + # For this we use the ability to override at task creation time the + # optional arguments so that we don't need to continue to send them + # in via the 'store' argument like in the above (and we fix the new + # starting point 'a' at (10, 10) instead of (0, 0)... + + ten_distance = linear_flow.Flow("ten") + ten_distance.add(DistanceTask(inject={'a': Point(10, 10)})) + results = engines.run(ten_distance, store={'b': Point(10, 10)}) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 0.0, + is_near(results['distance'], 0.0))) + + results = engines.run(ten_distance) + print(results) + print("%s is near-enough to %s: %s" % (results['distance'], + 14.14199, + is_near(results['distance'], + 14.14199))) diff --git a/taskflow/examples/optional_arguments.py b/taskflow/examples/optional_arguments.py deleted file mode 100644 index 66a4d380d..000000000 --- a/taskflow/examples/optional_arguments.py +++ /dev/null @@ -1,93 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (C) 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import unittest - -from taskflow import engines -from taskflow.patterns import linear_flow -from taskflow import task - - -class TestTask(task.Task): - def execute(self, a, b=5): - result = a * b - return result - -flow_no_inject = linear_flow.Flow("flow").add(TestTask(provides='result')) -flow_inject_a = linear_flow.Flow("flow").add(TestTask(provides='result', - inject={'a': 10})) -flow_inject_b = linear_flow.Flow("flow").add(TestTask(provides='result', - inject={'b': 1000})) - -ASSERT = True - - -class MyTest(unittest.TestCase): - def test_my_test(self): - print("Expected result = 15") - result = engines.run(flow_no_inject, store={'a': 3}) - print(result) - if ASSERT: - self.assertEqual(result, - {'a': 3, 'result': 15} - ) - - print("Expected result = 39") - result = engines.run(flow_no_inject, store={'a': 3, 'b': 7}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'b': 7, 'result': 21} - ) - - print("Expected result = 200") - result = engines.run(flow_inject_a, store={'a': 3}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'result': 50} - ) - - print("Expected result = 400") - result = engines.run(flow_inject_a, store={'a': 3, 'b': 7}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'b': 7, 'result': 70} - ) - - print("Expected result = 40") - result = engines.run(flow_inject_b, store={'a': 3}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'result': 3000} - ) - - print("Expected result = 40") - result = engines.run(flow_inject_b, store={'a': 3, 'b': 7}) - print(result) - if ASSERT: - self.assertEqual( - result, - {'a': 3, 'b': 7, 'result': 3000} - ) - -if __name__ == '__main__': - unittest.main() diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 8762d386d..9176b9d67 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -104,6 +104,51 @@ class EngineTaskTest(object): self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) +class EngineOptionalRequirementsTest(utils.EngineTestBase): + def test_expected_optional_multiplers(self): + flow_no_inject = lf.Flow("flow") + flow_no_inject.add(utils.OptionalTask(provides='result')) + + flow_inject_a = lf.Flow("flow") + flow_inject_a.add(utils.OptionalTask(provides='result', + inject={'a': 10})) + + flow_inject_b = lf.Flow("flow") + flow_inject_b.add(utils.OptionalTask(provides='result', + inject={'b': 1000})) + + engine = self._make_engine(flow_no_inject, store={'a': 3}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'result': 15}) + + engine = self._make_engine(flow_no_inject, + store={'a': 3, 'b': 7}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'b': 7, 'result': 21}) + + engine = self._make_engine(flow_inject_a, store={'a': 3}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'result': 50}) + + engine = self._make_engine(flow_inject_a, store={'a': 3, 'b': 7}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'b': 7, 'result': 70}) + + engine = self._make_engine(flow_inject_b, store={'a': 3}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'result': 3000}) + + engine = self._make_engine(flow_inject_b, store={'a': 3, 'b': 7}) + engine.run() + result = engine.storage.fetch_all() + self.assertEqual(result, {'a': 3, 'b': 7, 'result': 3000}) + + class EngineLinearFlowTest(utils.EngineTestBase): def test_run_empty_flow(self): @@ -601,14 +646,17 @@ class SerialEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineCheckingTaskTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None): + def _make_engine(self, flow, + flow_detail=None, store=None): return taskflow.engines.load(flow, flow_detail=flow_detail, engine='serial', - backend=self.backend) + backend=self.backend, + store=store) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) @@ -623,18 +671,21 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineCheckingTaskTest, test.TestCase): _EXECUTOR_WORKERS = 2 - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, + flow_detail=None, executor=None, store=None): if executor is None: executor = 'threads' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, executor=executor, engine='parallel', + store=store, max_workers=self._EXECUTOR_WORKERS) def test_correct_load(self): @@ -657,23 +708,27 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineCheckingTaskTest, test.TestCase): - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, + flow_detail=None, executor=None, store=None): if executor is None: executor = futures.GreenThreadPoolExecutor() self.addCleanup(executor.shutdown) return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', - executor=executor) + executor=executor, + store=store) class ParallelEngineWithProcessTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -682,13 +737,15 @@ class ParallelEngineWithProcessTest(EngineTaskTest, engine = self._make_engine(utils.TaskNoRequiresNoReturns) self.assertIsInstance(engine, eng.ParallelActionEngine) - def _make_engine(self, flow, flow_detail=None, executor=None): + def _make_engine(self, flow, + flow_detail=None, executor=None, store=None): if executor is None: executor = 'processes' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', executor=executor, + store=store, max_workers=self._EXECUTOR_WORKERS) @@ -696,6 +753,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineLinearFlowTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, + EngineOptionalRequirementsTest, EngineGraphFlowTest, test.TestCase): def setUp(self): @@ -740,9 +798,11 @@ class WorkerBasedEngineTest(EngineTaskTest, self.worker_thread.join() super(WorkerBasedEngineTest, self).tearDown() - def _make_engine(self, flow, flow_detail=None): + def _make_engine(self, flow, + flow_detail=None, store=None): return taskflow.engines.load(flow, flow_detail=flow_detail, - backend=self.backend, **self.engine_conf) + backend=self.backend, + store=store, **self.engine_conf) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index cc4578c05..7020a931b 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -34,7 +34,7 @@ class TestWorker(test.MockTestCase): self.exchange = 'test-exchange' self.topic = 'test-topic' self.threads_count = 5 - self.endpoint_count = 21 + self.endpoint_count = 22 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 2594798f7..fbbb83c77 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -157,6 +157,12 @@ class FailingTask(ProgressingTask): raise RuntimeError('Woot!') +class OptionalTask(task.Task): + def execute(self, a, b=5): + result = a * b + return result + + class TaskWithFailure(task.Task): def execute(self, **kwargs):