Merge "Cleanup WBE example to be simpler to understand"

This commit is contained in:
Jenkins 2014-09-03 17:51:38 +00:00 committed by Gerrit Code Review
commit 3ea6271697
7 changed files with 163 additions and 198 deletions

View File

@ -151,3 +151,15 @@ Controlling retries using a retry controller
:language: python
:linenos:
:lines: 16-
Distributed execution (simple)
==============================
.. note::
Full source located at :example:`wbe_simple_linear`
.. literalinclude:: ../../taskflow/examples/wbe_simple_linear.py
:language: python
:linenos:
:lines: 16-

View File

@ -0,0 +1,5 @@
Running 2 workers.
Executing some work.
Execution finished.
Result = {"result1": 1, "result2": 666, "x": 111, "y": 222, "z": 333}
Stopping workers.

View File

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import os
import sys
import tempfile
import threading
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import linear_flow as lf
from taskflow.tests import utils
import example_utils # noqa
# INTRO: This example walks through a miniature workflow which shows how to
# start up a number of workers (these workers will process task execution and
# reversion requests using any provided input data) and then use an engine
# that creates a set of *capable* tasks and flows (the engine can not create
# tasks that the workers are not able to run, this will end in failure) that
# those workers will run and then executes that workflow seamlessly using the
# workers to perform the actual execution.
#
# NOTE(harlowja): this example simulates the expected larger number of workers
# by using a set of threads (which in this example simulate the remote workers
# that would typically be running on other external machines).
# A filesystem can also be used as the queue transport (useful as simple
# transport type that does not involve setting up a larger mq system). If this
# is false then the memory transport is used instead, both work in standalone
# setups.
USE_FILESYSTEM = False
BASE_SHARED_CONF = {
'exchange': 'taskflow',
}
WORKERS = 2
WORKER_CONF = {
# These are the tasks the worker can execute, they *must* be importable,
# typically this list is used to restrict what workers may execute to
# a smaller set of *allowed* tasks that are known to be safe (one would
# not want to allow all python code to be executed).
'tasks': [
'taskflow.tests.utils:TaskOneArgOneReturn',
'taskflow.tests.utils:TaskMultiArgOneReturn'
],
}
ENGINE_CONF = {
'engine': 'worker-based',
}
def run(engine_conf):
flow = lf.Flow('simple-linear').add(
utils.TaskOneArgOneReturn(provides='result1'),
utils.TaskMultiArgOneReturn(provides='result2')
)
eng = engines.load(flow,
store=dict(x=111, y=222, z=333),
engine_conf=engine_conf)
eng.run()
return eng.storage.fetch_all()
if __name__ == "__main__":
logging.basicConfig(level=logging.ERROR)
# Setup our transport configuration and merge it into the worker and
# engine configuration so that both of those use it correctly.
shared_conf = dict(BASE_SHARED_CONF)
tmp_path = None
if USE_FILESYSTEM:
tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
shared_conf.update({
'transport': 'filesystem',
'transport_options': {
'data_folder_in': tmp_path,
'data_folder_out': tmp_path,
'polling_interval': 0.1,
},
})
else:
shared_conf.update({
'transport': 'memory',
'transport_options': {
'polling_interval': 0.1,
},
})
worker_conf = dict(WORKER_CONF)
worker_conf.update(shared_conf)
engine_conf = dict(ENGINE_CONF)
engine_conf.update(shared_conf)
workers = []
worker_topics = []
try:
# Create a set of workers to simulate actual remote workers.
print('Running %s workers.' % (WORKERS))
for i in range(0, WORKERS):
worker_conf['topic'] = 'worker-%s' % (i + 1)
worker_topics.append(worker_conf['topic'])
w = worker.Worker(**worker_conf)
runner = threading.Thread(target=w.run)
runner.daemon = True
runner.start()
w.wait()
workers.append((runner, w.stop))
# Now use those workers to do something.
print('Executing some work.')
engine_conf['topics'] = worker_topics
result = run(engine_conf)
print('Execution finished.')
# This is done so that the test examples can work correctly
# even when the keys change order (which will happen in various
# python versions).
print("Result = %s" % json.dumps(result, sort_keys=True))
finally:
# And cleanup.
print('Stopping workers.')
while workers:
r, stopper = workers.pop()
stopper()
r.join()
if tmp_path:
example_utils.rm_path(tmp_path)

View File

@ -1,61 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import sys
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow.tests import utils
LOG = logging.getLogger(__name__)
if __name__ == "__main__":
logging.basicConfig(level=logging.ERROR)
engine_conf = {
'engine': 'worker-based',
'exchange': 'taskflow',
'topics': ['test-topic'],
}
# parse command line
try:
arg = sys.argv[1]
except IndexError:
pass
else:
try:
cfg = json.loads(arg)
except ValueError:
engine_conf.update(url=arg)
else:
engine_conf.update(cfg)
finally:
LOG.debug("Worker configuration: %s\n" %
json.dumps(engine_conf, sort_keys=True, indent=4))
# create and run flow
flow = lf.Flow('simple-linear').add(
utils.TaskOneArgOneReturn(provides='result1'),
utils.TaskMultiArgOneReturn(provides='result2')
)
eng = taskflow.engines.load(flow,
store=dict(x=111, y=222, z=333),
engine_conf=engine_conf)
eng.run()
print(json.dumps(eng.storage.fetch_all(), sort_keys=True))

View File

@ -1,58 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import sys
from taskflow.engines.worker_based import worker as w
LOG = logging.getLogger(__name__)
if __name__ == "__main__":
logging.basicConfig(level=logging.ERROR)
worker_conf = {
'exchange': 'taskflow',
'topic': 'test-topic',
'tasks': [
'taskflow.tests.utils:TaskOneArgOneReturn',
'taskflow.tests.utils:TaskMultiArgOneReturn'
]
}
# parse command line
try:
arg = sys.argv[1]
except IndexError:
pass
else:
try:
cfg = json.loads(arg)
except ValueError:
worker_conf.update(url=arg)
else:
worker_conf.update(cfg)
finally:
LOG.debug("Worker configuration: %s\n" %
json.dumps(worker_conf, sort_keys=True, indent=4))
# run worker
worker = w.Worker(**worker_conf)
try:
worker.run()
except KeyboardInterrupt:
pass

View File

@ -1,6 +0,0 @@
Run worker.
Run flow.
{"result1": 1, "result2": 666, "x": 111, "y": 222, "z": 333}
Flow finished.
Stop worker.

View File

@ -1,73 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import subprocess
import sys
import tempfile
self_dir = os.path.abspath(os.path.dirname(__file__))
sys.path.insert(0, self_dir)
import example_utils # noqa
def _path_to(name):
return os.path.abspath(os.path.join(os.path.dirname(__file__),
'worker_based', name))
def run_test(name, config):
cmd = [sys.executable, _path_to(name), config]
process = subprocess.Popen(cmd, stdin=None, stdout=subprocess.PIPE,
stderr=sys.stderr)
return process, cmd
def main():
tmp_path = None
try:
tmp_path = tempfile.mkdtemp(prefix='worker-based-example-')
config = json.dumps({
'transport': 'filesystem',
'transport_options': {
'data_folder_in': tmp_path,
'data_folder_out': tmp_path
}
})
print('Run worker.')
worker_process, _ = run_test('worker.py', config)
print('Run flow.')
flow_process, flow_cmd = run_test('flow.py', config)
stdout, _ = flow_process.communicate()
rc = flow_process.returncode
if rc != 0:
raise RuntimeError("Could not run %s [%s]" % (flow_cmd, rc))
print(stdout.decode())
print('Flow finished.')
print('Stop worker.')
worker_process.terminate()
finally:
if tmp_path is not None:
example_utils.rm_path(tmp_path)
if __name__ == '__main__':
main()