diff --git a/doc/source/examples.rst b/doc/source/examples.rst index 86766641..9199bc11 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -151,3 +151,15 @@ Controlling retries using a retry controller :language: python :linenos: :lines: 16- + +Distributed execution (simple) +============================== + +.. note:: + + Full source located at :example:`wbe_simple_linear` + +.. literalinclude:: ../../taskflow/examples/wbe_simple_linear.py + :language: python + :linenos: + :lines: 16- diff --git a/taskflow/examples/wbe_simple_linear.out.txt b/taskflow/examples/wbe_simple_linear.out.txt new file mode 100644 index 00000000..1585fb96 --- /dev/null +++ b/taskflow/examples/wbe_simple_linear.out.txt @@ -0,0 +1,5 @@ +Running 2 workers. +Executing some work. +Execution finished. +Result = {"result1": 1, "result2": 666, "x": 111, "y": 222, "z": 333} +Stopping workers. diff --git a/taskflow/examples/wbe_simple_linear.py b/taskflow/examples/wbe_simple_linear.py new file mode 100644 index 00000000..e28579f8 --- /dev/null +++ b/taskflow/examples/wbe_simple_linear.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import logging +import os +import sys +import tempfile +import threading + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.engines.worker_based import worker +from taskflow.patterns import linear_flow as lf +from taskflow.tests import utils + +import example_utils # noqa + +# INTRO: This example walks through a miniature workflow which shows how to +# start up a number of workers (these workers will process task execution and +# reversion requests using any provided input data) and then use an engine +# that creates a set of *capable* tasks and flows (the engine can not create +# tasks that the workers are not able to run, this will end in failure) that +# those workers will run and then executes that workflow seamlessly using the +# workers to perform the actual execution. +# +# NOTE(harlowja): this example simulates the expected larger number of workers +# by using a set of threads (which in this example simulate the remote workers +# that would typically be running on other external machines). + +# A filesystem can also be used as the queue transport (useful as simple +# transport type that does not involve setting up a larger mq system). If this +# is false then the memory transport is used instead, both work in standalone +# setups. +USE_FILESYSTEM = False +BASE_SHARED_CONF = { + 'exchange': 'taskflow', +} +WORKERS = 2 +WORKER_CONF = { + # These are the tasks the worker can execute, they *must* be importable, + # typically this list is used to restrict what workers may execute to + # a smaller set of *allowed* tasks that are known to be safe (one would + # not want to allow all python code to be executed). + 'tasks': [ + 'taskflow.tests.utils:TaskOneArgOneReturn', + 'taskflow.tests.utils:TaskMultiArgOneReturn' + ], +} +ENGINE_CONF = { + 'engine': 'worker-based', +} + + +def run(engine_conf): + flow = lf.Flow('simple-linear').add( + utils.TaskOneArgOneReturn(provides='result1'), + utils.TaskMultiArgOneReturn(provides='result2') + ) + eng = engines.load(flow, + store=dict(x=111, y=222, z=333), + engine_conf=engine_conf) + eng.run() + return eng.storage.fetch_all() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.ERROR) + + # Setup our transport configuration and merge it into the worker and + # engine configuration so that both of those use it correctly. + shared_conf = dict(BASE_SHARED_CONF) + + tmp_path = None + if USE_FILESYSTEM: + tmp_path = tempfile.mkdtemp(prefix='wbe-example-') + shared_conf.update({ + 'transport': 'filesystem', + 'transport_options': { + 'data_folder_in': tmp_path, + 'data_folder_out': tmp_path, + 'polling_interval': 0.1, + }, + }) + else: + shared_conf.update({ + 'transport': 'memory', + 'transport_options': { + 'polling_interval': 0.1, + }, + }) + worker_conf = dict(WORKER_CONF) + worker_conf.update(shared_conf) + engine_conf = dict(ENGINE_CONF) + engine_conf.update(shared_conf) + workers = [] + worker_topics = [] + + try: + # Create a set of workers to simulate actual remote workers. + print('Running %s workers.' % (WORKERS)) + for i in range(0, WORKERS): + worker_conf['topic'] = 'worker-%s' % (i + 1) + worker_topics.append(worker_conf['topic']) + w = worker.Worker(**worker_conf) + runner = threading.Thread(target=w.run) + runner.daemon = True + runner.start() + w.wait() + workers.append((runner, w.stop)) + + # Now use those workers to do something. + print('Executing some work.') + engine_conf['topics'] = worker_topics + result = run(engine_conf) + print('Execution finished.') + # This is done so that the test examples can work correctly + # even when the keys change order (which will happen in various + # python versions). + print("Result = %s" % json.dumps(result, sort_keys=True)) + finally: + # And cleanup. + print('Stopping workers.') + while workers: + r, stopper = workers.pop() + stopper() + r.join() + if tmp_path: + example_utils.rm_path(tmp_path) diff --git a/taskflow/examples/worker_based/flow.py b/taskflow/examples/worker_based/flow.py deleted file mode 100644 index 50529a81..00000000 --- a/taskflow/examples/worker_based/flow.py +++ /dev/null @@ -1,61 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import logging -import sys - -import taskflow.engines -from taskflow.patterns import linear_flow as lf -from taskflow.tests import utils - -LOG = logging.getLogger(__name__) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.ERROR) - engine_conf = { - 'engine': 'worker-based', - 'exchange': 'taskflow', - 'topics': ['test-topic'], - } - - # parse command line - try: - arg = sys.argv[1] - except IndexError: - pass - else: - try: - cfg = json.loads(arg) - except ValueError: - engine_conf.update(url=arg) - else: - engine_conf.update(cfg) - finally: - LOG.debug("Worker configuration: %s\n" % - json.dumps(engine_conf, sort_keys=True, indent=4)) - - # create and run flow - flow = lf.Flow('simple-linear').add( - utils.TaskOneArgOneReturn(provides='result1'), - utils.TaskMultiArgOneReturn(provides='result2') - ) - eng = taskflow.engines.load(flow, - store=dict(x=111, y=222, z=333), - engine_conf=engine_conf) - eng.run() - print(json.dumps(eng.storage.fetch_all(), sort_keys=True)) diff --git a/taskflow/examples/worker_based/worker.py b/taskflow/examples/worker_based/worker.py deleted file mode 100644 index 405813c7..00000000 --- a/taskflow/examples/worker_based/worker.py +++ /dev/null @@ -1,58 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import logging -import sys - -from taskflow.engines.worker_based import worker as w - -LOG = logging.getLogger(__name__) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.ERROR) - worker_conf = { - 'exchange': 'taskflow', - 'topic': 'test-topic', - 'tasks': [ - 'taskflow.tests.utils:TaskOneArgOneReturn', - 'taskflow.tests.utils:TaskMultiArgOneReturn' - ] - } - - # parse command line - try: - arg = sys.argv[1] - except IndexError: - pass - else: - try: - cfg = json.loads(arg) - except ValueError: - worker_conf.update(url=arg) - else: - worker_conf.update(cfg) - finally: - LOG.debug("Worker configuration: %s\n" % - json.dumps(worker_conf, sort_keys=True, indent=4)) - - # run worker - worker = w.Worker(**worker_conf) - try: - worker.run() - except KeyboardInterrupt: - pass diff --git a/taskflow/examples/worker_based_flow.out.txt b/taskflow/examples/worker_based_flow.out.txt deleted file mode 100644 index 7b97ff93..00000000 --- a/taskflow/examples/worker_based_flow.out.txt +++ /dev/null @@ -1,6 +0,0 @@ -Run worker. -Run flow. -{"result1": 1, "result2": 666, "x": 111, "y": 222, "z": 333} - -Flow finished. -Stop worker. diff --git a/taskflow/examples/worker_based_flow.py b/taskflow/examples/worker_based_flow.py deleted file mode 100644 index ef984ee9..00000000 --- a/taskflow/examples/worker_based_flow.py +++ /dev/null @@ -1,73 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import os -import subprocess -import sys -import tempfile - -self_dir = os.path.abspath(os.path.dirname(__file__)) -sys.path.insert(0, self_dir) - -import example_utils # noqa - - -def _path_to(name): - return os.path.abspath(os.path.join(os.path.dirname(__file__), - 'worker_based', name)) - - -def run_test(name, config): - cmd = [sys.executable, _path_to(name), config] - process = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE, - stderr=sys.stderr) - return process, cmd - - -def main(): - tmp_path = None - try: - tmp_path = tempfile.mkdtemp(prefix='worker-based-example-') - config = json.dumps({ - 'transport': 'filesystem', - 'transport_options': { - 'data_folder_in': tmp_path, - 'data_folder_out': tmp_path - } - }) - - print('Run worker.') - worker_process, _ = run_test('worker.py', config) - - print('Run flow.') - flow_process, flow_cmd = run_test('flow.py', config) - stdout, _ = flow_process.communicate() - rc = flow_process.returncode - if rc != 0: - raise RuntimeError("Could not run %s [%s]" % (flow_cmd, rc)) - print(stdout.decode()) - print('Flow finished.') - - print('Stop worker.') - worker_process.terminate() - - finally: - if tmp_path is not None: - example_utils.rm_path(tmp_path) - -if __name__ == '__main__': - main()