44f17d005f
This library no longer supports Python 2, thus usage of six can be removed. This also removes workaround about pickle library used in Python 2 only. Change-Id: I19d298cf0f402d65f0b142dea0bf35cf992332a9
149 lines
5.4 KiB
Python
149 lines
5.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import os
|
|
import string
|
|
import sys
|
|
import time
|
|
|
|
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
|
os.pardir,
|
|
os.pardir))
|
|
sys.path.insert(0, top_dir)
|
|
|
|
from taskflow import engines
|
|
from taskflow.engines.worker_based import worker
|
|
from taskflow.patterns import linear_flow as lf
|
|
from taskflow import task
|
|
from taskflow.types import notifier
|
|
from taskflow.utils import threading_utils
|
|
|
|
ANY = notifier.Notifier.ANY
|
|
|
|
# INTRO: These examples show how to use a remote worker's event notification
|
|
# attribute to proxy back task event notifications to the controlling process.
|
|
#
|
|
# In this case a simple set of events is triggered by a worker running a
|
|
# task (simulated to be remote by using a kombu memory transport and threads).
|
|
# Those events that the 'remote worker' produces will then be proxied back to
|
|
# the task that the engine is running 'remotely', and then they will be emitted
|
|
# back to the original callbacks that exist in the originating engine
|
|
# process/thread. This creates a one-way *notification* channel that can
|
|
# transparently be used in-process, outside-of-process using remote workers and
|
|
# so-on that allows tasks to signal to its controlling process some sort of
|
|
# action that has occurred that the task may need to tell others about (for
|
|
# example to trigger some type of response when the task reaches 50% done...).
|
|
|
|
|
|
def event_receiver(event_type, details):
|
|
"""This is the callback that (in this example) doesn't do much..."""
|
|
print("Recieved event '%s'" % event_type)
|
|
print("Details = %s" % details)
|
|
|
|
|
|
class EventReporter(task.Task):
|
|
"""This is the task that will be running 'remotely' (not really remote)."""
|
|
|
|
EVENTS = tuple(string.ascii_uppercase)
|
|
EVENT_DELAY = 0.1
|
|
|
|
def execute(self):
|
|
for i, e in enumerate(self.EVENTS):
|
|
details = {
|
|
'leftover': self.EVENTS[i:],
|
|
}
|
|
self.notifier.notify(e, details)
|
|
time.sleep(self.EVENT_DELAY)
|
|
|
|
|
|
BASE_SHARED_CONF = {
|
|
'exchange': 'taskflow',
|
|
'transport': 'memory',
|
|
'transport_options': {
|
|
'polling_interval': 0.1,
|
|
},
|
|
}
|
|
|
|
# Until https://github.com/celery/kombu/issues/398 is resolved it is not
|
|
# recommended to run many worker threads in this example due to the types
|
|
# of errors mentioned in that issue.
|
|
MEMORY_WORKERS = 1
|
|
WORKER_CONF = {
|
|
'tasks': [
|
|
# Used to locate which tasks we can run (we don't want to allow
|
|
# arbitrary code/tasks to be ran by any worker since that would
|
|
# open up a variety of vulnerabilities).
|
|
'%s:EventReporter' % (__name__),
|
|
],
|
|
}
|
|
|
|
|
|
def run(engine_options):
|
|
reporter = EventReporter()
|
|
reporter.notifier.register(ANY, event_receiver)
|
|
flow = lf.Flow('event-reporter').add(reporter)
|
|
eng = engines.load(flow, engine='worker-based', **engine_options)
|
|
eng.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.ERROR)
|
|
|
|
# Setup our transport configuration and merge it into the worker and
|
|
# engine configuration so that both of those objects use it correctly.
|
|
worker_conf = dict(WORKER_CONF)
|
|
worker_conf.update(BASE_SHARED_CONF)
|
|
engine_options = dict(BASE_SHARED_CONF)
|
|
workers = []
|
|
|
|
# These topics will be used to request worker information on; those
|
|
# workers will respond with their capabilities which the executing engine
|
|
# will use to match pending tasks to a matched worker, this will cause
|
|
# the task to be sent for execution, and the engine will wait until it
|
|
# is finished (a response is received) and then the engine will either
|
|
# continue with other tasks, do some retry/failure resolution logic or
|
|
# stop (and potentially re-raise the remote workers failure)...
|
|
worker_topics = []
|
|
|
|
try:
|
|
# Create a set of worker threads to simulate actual remote workers...
|
|
print('Running %s workers.' % (MEMORY_WORKERS))
|
|
for i in range(0, MEMORY_WORKERS):
|
|
# Give each one its own unique topic name so that they can
|
|
# correctly communicate with the engine (they will all share the
|
|
# same exchange).
|
|
worker_conf['topic'] = 'worker-%s' % (i + 1)
|
|
worker_topics.append(worker_conf['topic'])
|
|
w = worker.Worker(**worker_conf)
|
|
runner = threading_utils.daemon_thread(w.run)
|
|
runner.start()
|
|
w.wait()
|
|
workers.append((runner, w.stop))
|
|
|
|
# Now use those workers to do something.
|
|
print('Executing some work.')
|
|
engine_options['topics'] = worker_topics
|
|
result = run(engine_options)
|
|
print('Execution finished.')
|
|
finally:
|
|
# And cleanup.
|
|
print('Stopping workers.')
|
|
while workers:
|
|
r, stopper = workers.pop()
|
|
stopper()
|
|
r.join()
|