Merge "Add an example which shows how to send events out from tasks"
This commit is contained in:
@@ -236,6 +236,18 @@ Distributed execution (simple)
|
||||
:linenos:
|
||||
:lines: 16-
|
||||
|
||||
Distributed notification (simple)
|
||||
=================================
|
||||
|
||||
.. note::
|
||||
|
||||
Full source located at :example:`wbe_event_sender`
|
||||
|
||||
.. literalinclude:: ../../taskflow/examples/wbe_event_sender.py
|
||||
:language: python
|
||||
:linenos:
|
||||
:lines: 16-
|
||||
|
||||
Distributed mandelbrot (complex)
|
||||
================================
|
||||
|
||||
|
||||
148
taskflow/examples/wbe_event_sender.py
Normal file
148
taskflow/examples/wbe_event_sender.py
Normal file
@@ -0,0 +1,148 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import string
|
||||
import sys
|
||||
import time
|
||||
|
||||
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
sys.path.insert(0, top_dir)
|
||||
|
||||
from six.moves import range as compat_range
|
||||
|
||||
from taskflow import engines
|
||||
from taskflow.engines.worker_based import worker
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow import task
|
||||
from taskflow.types import notifier
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
# INTRO: This examples shows how to use a remote workers event notification
|
||||
# attribute to proxy back task event notifications to the controlling process.
|
||||
#
|
||||
# In this case a simple set of events are triggered by a worker running a
|
||||
# task (simulated to be remote by using a kombu memory transport and threads).
|
||||
# Those events that the 'remote worker' produces will then be proxied back to
|
||||
# the task that the engine is running 'remotely', and then they will be emitted
|
||||
# back to the original callbacks that exist in the originating engine
|
||||
# process/thread. This creates a one-way *notification* channel that can
|
||||
# transparently be used in-process, outside-of-process using remote workers and
|
||||
# so-on that allows tasks to signal to its controlling process some sort of
|
||||
# action that has occurred that the task may need to tell others about (for
|
||||
# example to trigger some type of response when the task reaches 50% done...).
|
||||
|
||||
|
||||
def event_receiver(event_type, details):
|
||||
"""This is the callback that (in this example) doesn't do much..."""
|
||||
print("Recieved event '%s'" % event_type)
|
||||
print("Details = %s" % details)
|
||||
|
||||
|
||||
class EventReporter(task.Task):
|
||||
"""This is the task that will be running 'remotely' (not really remote)."""
|
||||
|
||||
EVENTS = tuple(string.ascii_uppercase)
|
||||
EVENT_DELAY = 0.1
|
||||
|
||||
def execute(self):
|
||||
for i, e in enumerate(self.EVENTS):
|
||||
details = {
|
||||
'leftover': self.EVENTS[i:],
|
||||
}
|
||||
self.notifier.notify(e, details)
|
||||
time.sleep(self.EVENT_DELAY)
|
||||
|
||||
|
||||
BASE_SHARED_CONF = {
|
||||
'exchange': 'taskflow',
|
||||
'transport': 'memory',
|
||||
'transport_options': {
|
||||
'polling_interval': 0.1,
|
||||
},
|
||||
}
|
||||
|
||||
# Until https://github.com/celery/kombu/issues/398 is resolved it is not
|
||||
# recommended to run many worker threads in this example due to the types
|
||||
# of errors mentioned in that issue.
|
||||
MEMORY_WORKERS = 1
|
||||
WORKER_CONF = {
|
||||
'tasks': [
|
||||
# Used to locate which tasks we can run (we don't want to allow
|
||||
# arbitrary code/tasks to be ran by any worker since that would
|
||||
# open up a variety of vulnerabilities).
|
||||
'%s:EventReporter' % (__name__),
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def run(engine_options):
|
||||
reporter = EventReporter()
|
||||
reporter.notifier.register(notifier.Notifier.ANY, event_receiver)
|
||||
flow = lf.Flow('event-reporter').add(reporter)
|
||||
eng = engines.load(flow, engine='worker-based', **engine_options)
|
||||
eng.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
|
||||
# Setup our transport configuration and merge it into the worker and
|
||||
# engine configuration so that both of those objects use it correctly.
|
||||
worker_conf = dict(WORKER_CONF)
|
||||
worker_conf.update(BASE_SHARED_CONF)
|
||||
engine_options = dict(BASE_SHARED_CONF)
|
||||
workers = []
|
||||
|
||||
# These topics will be used to request worker information on; those
|
||||
# workers will respond with there capabilities which the executing engine
|
||||
# will use to match pending tasks to a matched worker, this will cause
|
||||
# the task to be sent for execution, and the engine will wait until it
|
||||
# is finished (a response is recieved) and then the engine will either
|
||||
# continue with other tasks, do some retry/failure resolution logic or
|
||||
# stop (and potentially re-raise the remote workers failure)...
|
||||
worker_topics = []
|
||||
|
||||
try:
|
||||
# Create a set of worker threads to simulate actual remote workers...
|
||||
print('Running %s workers.' % (MEMORY_WORKERS))
|
||||
for i in compat_range(0, MEMORY_WORKERS):
|
||||
# Give each one its own unique topic name so that they can
|
||||
# correctly communicate with the engine (they will all share the
|
||||
# same exchange).
|
||||
worker_conf['topic'] = 'worker-%s' % (i + 1)
|
||||
worker_topics.append(worker_conf['topic'])
|
||||
w = worker.Worker(**worker_conf)
|
||||
runner = threading_utils.daemon_thread(w.run)
|
||||
runner.start()
|
||||
w.wait()
|
||||
workers.append((runner, w.stop))
|
||||
|
||||
# Now use those workers to do something.
|
||||
print('Executing some work.')
|
||||
engine_options['topics'] = worker_topics
|
||||
result = run(engine_options)
|
||||
print('Execution finished.')
|
||||
finally:
|
||||
# And cleanup.
|
||||
print('Stopping workers.')
|
||||
while workers:
|
||||
r, stopper = workers.pop()
|
||||
stopper()
|
||||
r.join()
|
||||
Reference in New Issue
Block a user