Merge "Get event/notification sending working correctly"
This commit is contained in:
@@ -46,6 +46,18 @@ Building a car
|
||||
:linenos:
|
||||
:lines: 16-
|
||||
|
||||
Iterating over the alphabet (using processes)
|
||||
=============================================
|
||||
|
||||
.. note::
|
||||
|
||||
Full source located at :example:`alphabet_soup`.
|
||||
|
||||
.. literalinclude:: ../../taskflow/examples/alphabet_soup.py
|
||||
:language: python
|
||||
:linenos:
|
||||
:lines: 16-
|
||||
|
||||
Watching execution timing
|
||||
=========================
|
||||
|
||||
|
||||
@@ -248,6 +248,8 @@ class ParallelActionEngine(ActionEngine):
|
||||
# forcing everyone to use our derivatives...
|
||||
if isinstance(kwargs['executor'], futures.ProcessPoolExecutor):
|
||||
executor_cls = executor.ParallelProcessTaskExecutor
|
||||
kwargs['dispatch_periodicity'] = self._options.get(
|
||||
'dispatch_periodicity')
|
||||
else:
|
||||
executor_cls = executor.ParallelThreadTaskExecutor
|
||||
return executor_cls(**kwargs)
|
||||
|
||||
@@ -15,13 +15,24 @@
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import functools
|
||||
import multiprocessing
|
||||
import os
|
||||
import pickle
|
||||
import threading
|
||||
|
||||
from oslo.utils import excutils
|
||||
from oslo.utils import timeutils
|
||||
import six
|
||||
from six.moves import queue as compat_queue
|
||||
from six.moves import range as compat_range
|
||||
|
||||
from taskflow import logging
|
||||
from taskflow import task as task_atom
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import futures
|
||||
from taskflow.types import notifier
|
||||
from taskflow.types import timing
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
@@ -29,10 +40,32 @@ from taskflow.utils import threading_utils
|
||||
EXECUTED = 'executed'
|
||||
REVERTED = 'reverted'
|
||||
|
||||
# See http://bugs.python.org/issue1457119 for why this is so complex...
|
||||
_PICKLE_ERRORS = [pickle.PickleError, TypeError]
|
||||
try:
|
||||
import cPickle as _cPickle
|
||||
_PICKLE_ERRORS.append(_cPickle.PickleError)
|
||||
except ImportError:
|
||||
pass
|
||||
_PICKLE_ERRORS = tuple(_PICKLE_ERRORS)
|
||||
_SEND_ERRORS = (IOError, EOFError)
|
||||
_UPDATE_PROGRESS = task_atom.EVENT_UPDATE_PROGRESS
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _maybe_forever(limit=None):
|
||||
if limit is None:
|
||||
while True:
|
||||
yield
|
||||
else:
|
||||
for i in compat_range(0, limit):
|
||||
yield
|
||||
|
||||
|
||||
def _execute_task(task, arguments, progress_callback=None):
|
||||
with notifier.register_deregister(task.notifier,
|
||||
task_atom.EVENT_UPDATE_PROGRESS,
|
||||
_UPDATE_PROGRESS,
|
||||
callback=progress_callback):
|
||||
try:
|
||||
task.pre_execute()
|
||||
@@ -51,7 +84,7 @@ def _revert_task(task, arguments, result, failures, progress_callback=None):
|
||||
arguments[task_atom.REVERT_RESULT] = result
|
||||
arguments[task_atom.REVERT_FLOW_FAILURES] = failures
|
||||
with notifier.register_deregister(task.notifier,
|
||||
task_atom.EVENT_UPDATE_PROGRESS,
|
||||
_UPDATE_PROGRESS,
|
||||
callback=progress_callback):
|
||||
try:
|
||||
task.pre_revert()
|
||||
@@ -65,6 +98,182 @@ def _revert_task(task, arguments, result, failures, progress_callback=None):
|
||||
return (REVERTED, result)
|
||||
|
||||
|
||||
class _JoinedWorkItem(object):
|
||||
"""The piece of work that will executed by a process executor.
|
||||
|
||||
This will call the target function, then wait until the queues items
|
||||
have been completed (via calls to task_done) before offically being
|
||||
finished.
|
||||
|
||||
NOTE(harlowja): this is done so that the task function will *not* return
|
||||
until all of its notifications have been proxied back to its originating
|
||||
task. If we didn't do this then the executor would see this task as done
|
||||
and then potentially start tasks that are successors of the task that just
|
||||
finished even though notifications are still left to be sent from the
|
||||
previously finished task...
|
||||
"""
|
||||
|
||||
def __init__(self, queue, func, task, *args, **kwargs):
|
||||
self._queue = queue
|
||||
self._func = func
|
||||
self._task = task
|
||||
self._args = args
|
||||
self._kwargs = kwargs
|
||||
|
||||
def __call__(self):
|
||||
args = self._args
|
||||
kwargs = self._kwargs
|
||||
try:
|
||||
return self._func(self._task, *args, **kwargs)
|
||||
finally:
|
||||
w = timing.StopWatch().start()
|
||||
self._queue.join()
|
||||
LOG.blather("Waited %0.2f seconds until task '%s' emitted"
|
||||
" notifications were depleted", w.elapsed(),
|
||||
self._task)
|
||||
|
||||
|
||||
class _EventSender(object):
|
||||
"""Sends event information from a child worker process to its creator."""
|
||||
|
||||
def __init__(self, queue):
|
||||
self._queue = queue
|
||||
self._pid = None
|
||||
|
||||
def __call__(self, event_type, details):
|
||||
# NOTE(harlowja): this is done in late in execution to ensure that this
|
||||
# happens in the child process and not the parent process (where the
|
||||
# constructor is called).
|
||||
if self._pid is None:
|
||||
self._pid = os.getpid()
|
||||
message = {
|
||||
'created_on': timeutils.utcnow(),
|
||||
'sender': {
|
||||
'pid': self._pid,
|
||||
},
|
||||
'body': {
|
||||
'event_type': event_type,
|
||||
'details': details,
|
||||
},
|
||||
}
|
||||
try:
|
||||
self._queue.put(message)
|
||||
except _PICKLE_ERRORS:
|
||||
LOG.warn("Failed serializing message %s", message, exc_info=True)
|
||||
except _SEND_ERRORS:
|
||||
LOG.warn("Failed sending message %s", message, exc_info=True)
|
||||
|
||||
|
||||
class _EventTarget(object):
|
||||
"""An immutable helper object that represents a target of an event."""
|
||||
|
||||
def __init__(self, future, task, queue):
|
||||
self.future = future
|
||||
self.task = task
|
||||
self.queue = queue
|
||||
|
||||
|
||||
class _EventDispatcher(object):
|
||||
"""Dispatches event information received from child worker processes."""
|
||||
|
||||
# When the run() method is busy (typically in a thread) we want to set
|
||||
# these so that the thread can know how long to sleep when there is no
|
||||
# active work to dispatch (when there is active targets, there queues
|
||||
# will have amount/count of items removed before returning to work on
|
||||
# the next target...)
|
||||
_SPIN_PERIODICITY = 0.01
|
||||
_SPIN_DISPATCH_AMOUNT = 1
|
||||
|
||||
# TODO(harlowja): look again at using a non-polling mechanism that uses
|
||||
# select instead of queues to achieve better ability to detect when
|
||||
# messages are ready/available...
|
||||
|
||||
def __init__(self, dispatch_periodicity=None):
|
||||
if dispatch_periodicity is None:
|
||||
dispatch_periodicity = self._SPIN_PERIODICITY
|
||||
if dispatch_periodicity <= 0:
|
||||
raise ValueError("Provided dispatch periodicity must be greater"
|
||||
" than zero and not '%s'" % dispatch_periodicity)
|
||||
self._targets = set()
|
||||
self._dead = threading_utils.Event()
|
||||
self._lock = threading.Lock()
|
||||
self._periodicity = dispatch_periodicity
|
||||
self._stop_when_empty = False
|
||||
|
||||
def register(self, target):
|
||||
with self._lock:
|
||||
self._targets.add(target)
|
||||
|
||||
def _dispatch_until_empty(self, target, limit=None):
|
||||
it = _maybe_forever(limit=limit)
|
||||
while True:
|
||||
try:
|
||||
six.next(it)
|
||||
except StopIteration:
|
||||
break
|
||||
else:
|
||||
try:
|
||||
message = target.queue.get_nowait()
|
||||
except compat_queue.Empty:
|
||||
break
|
||||
else:
|
||||
try:
|
||||
self._dispatch(target.task, message)
|
||||
finally:
|
||||
target.queue.task_done()
|
||||
|
||||
def deregister(self, target):
|
||||
with self._lock:
|
||||
try:
|
||||
self._targets.remove(target)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
self._stop_when_empty = False
|
||||
self._dead.clear()
|
||||
|
||||
def interrupt(self):
|
||||
self._stop_when_empty = True
|
||||
self._dead.set()
|
||||
|
||||
def _dispatch(self, task, message):
|
||||
LOG.blather("Dispatching message %s to task '%s'", message, task)
|
||||
body = message['body']
|
||||
task.notifier.notify(body['event_type'], body['details'])
|
||||
|
||||
def _dispatch_iter(self, targets):
|
||||
# A generator that yields at certain points to allow the main run()
|
||||
# method to use this to dispatch in iterations (and also allows it
|
||||
# to check if it has been stopped between iterations).
|
||||
for target in targets:
|
||||
if target not in self._targets:
|
||||
# Must of been removed...
|
||||
continue
|
||||
# NOTE(harlowja): Limits are used here to avoid one
|
||||
# task unequally dispatching, this forces round-robin
|
||||
# like behavior...
|
||||
self._dispatch_until_empty(target,
|
||||
limit=self._SPIN_DISPATCH_AMOUNT)
|
||||
yield target
|
||||
|
||||
def run(self):
|
||||
w = timing.StopWatch(duration=self._periodicity)
|
||||
while (not self._dead.is_set() or
|
||||
(self._stop_when_empty and self._targets)):
|
||||
w.restart()
|
||||
with self._lock:
|
||||
targets = self._targets.copy()
|
||||
for _target in self._dispatch_iter(targets):
|
||||
if self._stop_when_empty:
|
||||
continue
|
||||
if self._dead.is_set():
|
||||
break
|
||||
leftover = w.leftover()
|
||||
if leftover:
|
||||
self._dead.wait(leftover)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class TaskExecutor(object):
|
||||
"""Executes and reverts tasks.
|
||||
@@ -176,22 +385,125 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
|
||||
NOTE(harlowja): this executor executes tasks in external processes, so that
|
||||
implies that tasks that are sent to that external process are pickleable
|
||||
since this is how the multiprocessing works (sending pickled objects back
|
||||
and forth).
|
||||
and forth) and that the bound handlers (for progress updating in
|
||||
particular) are proxied correctly from that external process to the one
|
||||
that is alive in the parent process to ensure that callbacks registered in
|
||||
the parent are executed on events in the child.
|
||||
"""
|
||||
|
||||
def __init__(self, executor=None, max_workers=None,
|
||||
dispatch_periodicity=None):
|
||||
super(ParallelProcessTaskExecutor, self).__init__(
|
||||
executor=executor, max_workers=max_workers)
|
||||
self._manager = multiprocessing.Manager()
|
||||
self._queue_factory = lambda: self._manager.JoinableQueue()
|
||||
self._dispatcher = _EventDispatcher(
|
||||
dispatch_periodicity=dispatch_periodicity)
|
||||
self._worker = None
|
||||
|
||||
def _create_executor(self, max_workers=None):
|
||||
return futures.ProcessPoolExecutor(max_workers=max_workers)
|
||||
|
||||
def start(self):
|
||||
super(ParallelProcessTaskExecutor, self).start()
|
||||
if not threading_utils.is_alive(self._worker):
|
||||
self._dispatcher.reset()
|
||||
self._worker = threading_utils.daemon_thread(self._dispatcher.run)
|
||||
self._worker.start()
|
||||
|
||||
def stop(self):
|
||||
self._dispatcher.interrupt()
|
||||
super(ParallelProcessTaskExecutor, self).stop()
|
||||
if threading_utils.is_alive(self._worker):
|
||||
self._worker.join()
|
||||
self._worker = None
|
||||
self._dispatcher.reset()
|
||||
|
||||
def _rebind_task(self, task, clone, queue, progress_callback=None):
|
||||
# Creates and binds proxies for all events the task could receive
|
||||
# so that when the clone runs in another process that this task
|
||||
# can recieve the same notifications (thus making it look like the
|
||||
# the notifications are transparently happening in this process).
|
||||
needed = set()
|
||||
for (event_type, listeners) in task.notifier.listeners_iter():
|
||||
if listeners:
|
||||
needed.add(event_type)
|
||||
# We don't register for the 'ANY' event; since that meta event type
|
||||
# will be correctly proxied by the task notifier directly without
|
||||
# needing clone replication.
|
||||
needed.discard(task.notifier.ANY)
|
||||
if progress_callback is not None:
|
||||
needed.add(_UPDATE_PROGRESS)
|
||||
for event_type in needed:
|
||||
clone.notifier.register(event_type, _EventSender(queue))
|
||||
return needed
|
||||
|
||||
def _submit_task(self, func, task, *args, **kwargs):
|
||||
"""Submit a function to run the given task (with given args/kwargs).
|
||||
|
||||
NOTE(harlowja): task callbacks/notifications will not currently
|
||||
work (they will be removed before being sent to the target process
|
||||
for execution).
|
||||
NOTE(harlowja): Adjust all events to be proxies instead since we want
|
||||
those callbacks to be activated in this process, not in the child,
|
||||
also since typically callbacks are functors (or callables) we can
|
||||
not pickle those in the first place...
|
||||
|
||||
To make sure people understand how this works, the following is a
|
||||
lengthy description of what is going on here, read at will:
|
||||
|
||||
So to ensure that we are proxying task triggered events that occur
|
||||
in the executed subprocess (which will be created and used by the
|
||||
thing using the multiprocessing based executor) we need to establish
|
||||
a link between that process and this process that ensures that when a
|
||||
event is triggered in that task in that process that a corresponding
|
||||
event is triggered on the original task that was requested to be ran
|
||||
in this process.
|
||||
|
||||
To accomplish this we have to create a copy of the task (without
|
||||
any listeners) and then reattach a new set of listeners that will
|
||||
now instead of calling the desired listeners just place messages
|
||||
for this process (a dispatcher thread that is created in this class)
|
||||
to dispatch to the original task (using a per task queue that is used
|
||||
and associated to know which task to proxy back too, since it is
|
||||
possible that there many be *many* subprocess running at the same
|
||||
time, each running a different task).
|
||||
|
||||
Once the subprocess task has finished execution, the executor will
|
||||
then trigger a callback (``on_done`` in this case) that will remove
|
||||
the task + queue from the dispatcher (which will stop any further
|
||||
proxying back to the original task).
|
||||
"""
|
||||
kwargs.pop('progress_callback', None)
|
||||
progress_callback = kwargs.pop('progress_callback', None)
|
||||
clone = task.copy(retain_listeners=False)
|
||||
fut = super(ParallelProcessTaskExecutor, self)._submit_task(
|
||||
func, clone, *args, **kwargs)
|
||||
queue = self._queue_factory()
|
||||
bound = self._rebind_task(task, clone, queue,
|
||||
progress_callback=progress_callback)
|
||||
LOG.blather("Bound %s event types to clone of '%s'", bound, task)
|
||||
if progress_callback is not None:
|
||||
binder = functools.partial(task.notifier.register,
|
||||
_UPDATE_PROGRESS, progress_callback)
|
||||
unbinder = functools.partial(task.notifier.deregister,
|
||||
_UPDATE_PROGRESS, progress_callback)
|
||||
else:
|
||||
binder = unbinder = lambda: None
|
||||
|
||||
# Ensure the target task (not the clone) is ready and able to receive
|
||||
# dispatched messages (and start the dispatching process by
|
||||
# registering) with the dispatcher.
|
||||
binder()
|
||||
work = _JoinedWorkItem(queue, func, clone, *args, **kwargs)
|
||||
try:
|
||||
fut = self._executor.submit(work)
|
||||
except RuntimeError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
unbinder()
|
||||
|
||||
# This will trigger the proxying to begin...
|
||||
target = _EventTarget(fut, task, queue)
|
||||
self._dispatcher.register(target)
|
||||
|
||||
def on_done(unbinder, target, fut):
|
||||
self._dispatcher.deregister(target)
|
||||
unbinder()
|
||||
|
||||
fut.atom = task
|
||||
fut.add_done_callback(functools.partial(on_done, unbinder, target))
|
||||
return fut
|
||||
|
||||
101
taskflow/examples/alphabet_soup.py
Normal file
101
taskflow/examples/alphabet_soup.py
Normal file
@@ -0,0 +1,101 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import fractions
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import string
|
||||
import sys
|
||||
import time
|
||||
|
||||
from concurrent import futures
|
||||
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
|
||||
self_dir = os.path.abspath(os.path.dirname(__file__))
|
||||
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
sys.path.insert(0, top_dir)
|
||||
sys.path.insert(0, self_dir)
|
||||
|
||||
from taskflow import engines
|
||||
from taskflow import exceptions
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow import task
|
||||
|
||||
|
||||
# In this example we show how a simple linear set of tasks can be executed
|
||||
# using local processes (and not threads or remote workers) with minimial (if
|
||||
# any) modification to those tasks to make them safe to run in this mode.
|
||||
#
|
||||
# This is useful since it allows further scaling up your workflows when thread
|
||||
# execution starts to become a bottleneck (which it can start to be due to the
|
||||
# GIL in python). It also offers a intermediary scalable runner that can be
|
||||
# used when the scale and or setup of remote workers is not desirable.
|
||||
|
||||
# How many local processes to potentially use when executing... (one is fine
|
||||
# for this example, but more can be used to show play around with what happens
|
||||
# with many...)
|
||||
WORKERS = 1
|
||||
|
||||
|
||||
def progress_printer(task, event_type, details):
|
||||
# This callback, attached to each task will be called in the local
|
||||
# process (not the child processes)...
|
||||
progress = details.pop('progress')
|
||||
progress = int(progress * 100.0)
|
||||
print("Task '%s' reached %d%% completion" % (task.name, progress))
|
||||
|
||||
|
||||
class AlphabetTask(task.Task):
|
||||
# Second delay between each progress part.
|
||||
_DELAY = 0.1
|
||||
|
||||
# This task will run in X main stages (each with a different progress
|
||||
# report that will be delivered back to the running process...). The
|
||||
# initial 0% and 100% are triggered automatically by the engine when
|
||||
# a task is started and finished (so that's why those are not emitted
|
||||
# here).
|
||||
_PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
|
||||
|
||||
def execute(self):
|
||||
for p in self._PROGRESS_PARTS:
|
||||
self.update_progress(p)
|
||||
time.sleep(self._DELAY)
|
||||
|
||||
|
||||
print("Constructing...")
|
||||
soup = linear_flow.Flow("alphabet-soup")
|
||||
for letter in string.ascii_lowercase:
|
||||
abc = AlphabetTask(letter)
|
||||
abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
|
||||
functools.partial(progress_printer, abc))
|
||||
soup.add(abc)
|
||||
try:
|
||||
with futures.ProcessPoolExecutor(WORKERS) as executor:
|
||||
print("Loading...")
|
||||
e = engines.load(soup, engine='parallel', executor=executor)
|
||||
print("Compiling...")
|
||||
e.compile()
|
||||
print("Preparing...")
|
||||
e.prepare()
|
||||
print("Running...")
|
||||
e.run()
|
||||
print("Done...")
|
||||
except exceptions.NotImplementedError as e:
|
||||
print(e)
|
||||
@@ -37,8 +37,13 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def try_lock(lock):
|
||||
"""Attempts to acquire a lock, and autoreleases if acquisition occurred."""
|
||||
was_locked = lock.acquire(blocking=False)
|
||||
"""Attempts to acquire a lock, and auto releases if acquired (on exit)."""
|
||||
# NOTE(harlowja): the keyword argument for 'blocking' does not work
|
||||
# in py2.x and only is fixed in py3.x (this adjustment is documented
|
||||
# and/or debated in http://bugs.python.org/issue10789); so we'll just
|
||||
# stick to the format that works in both (oddly the keyword argument
|
||||
# works in py2.x but only with reentrant locks).
|
||||
was_locked = lock.acquire(False)
|
||||
try:
|
||||
yield was_locked
|
||||
finally:
|
||||
|
||||
Reference in New Issue
Block a user