Fix process based executor task proxying-back events

Let's dive into what the problem is here.

First a description of what happens to a task that
is to be executed in a external (but local) process
via the process executor mechanism.

When a task is about to be sent to execute in the
external (but local) process its first cloned, this
is mainly done so that its notification callbacks can
be altered in a safe manner (ie not altering the
original task object to do this) and that clone has
its notifier emptied out.

What replaces the clone's notifier callbacks though
is a new object (that has a __call__ method so it
looks like just another callback) that will send
messages to the parent process (the one that has
the engine in it) over a secure(ish) channel whenever
the local task triggers its notifier notify() method.

This allows for callbacks in the parent process to
get triggered because once the messages recieved the
original tasks notifier object has its notify() method
called (therefore those callbacks do not really know
the task they are getting messages from is executing out
of process).

The issue though is that if the ANY(*) event type is registered
due to how it works in the notifier is that if the child/cloned
notifier has the ANY event type registered and the cloned task
calls notify() with a specific event this will cause the ANY
callback (in the clone) to transmit a message *and* it will
cause the *specific* event callback to also transmit a message
back to the parent process.

On the engine process side it will get 2 messages and trigger
the callbacks 3 times (twice for the specific event callback
because how the local notifier has the ANY callback registered
and one more time when the local process also sends the same
event based on its registration of the ANY event in the child
process).

This is not what is expected (the message rcved on the engine
process should only trigger one callback to get triggered
if the engine process task has no ANY callback registered or two
engine process callbacks to get triggered if the engine process
task has the ANY callback registered).

Closes-Bug: #1537948

Change-Id: I271bf1f23ad73df6c177cf00fd902c4881ba44ae
This commit is contained in:
Joshua Harlow 2016-01-25 15:56:07 -08:00 committed by ChangBo Guo(gcb)
parent 1229eb2e8e
commit 84c7a7b2c7
3 changed files with 87 additions and 9 deletions

View File

@ -34,6 +34,7 @@ import six
from taskflow.engines.action_engine import executor as base
from taskflow import logging
from taskflow import task as ta
from taskflow.types import notifier as nt
from taskflow.utils import iter_utils
from taskflow.utils import misc
from taskflow.utils import schema_utils as su
@ -675,16 +676,38 @@ class ParallelProcessTaskExecutor(base.ParallelTaskExecutor):
# so that when the clone runs in another process that this task
# can receive the same notifications (thus making it look like the
# the notifications are transparently happening in this process).
needed = set()
proxy_event_types = set()
for (event_type, listeners) in task.notifier.listeners_iter():
if listeners:
needed.add(event_type)
proxy_event_types.add(event_type)
if progress_callback is not None:
needed.add(ta.EVENT_UPDATE_PROGRESS)
if needed:
proxy_event_types.add(ta.EVENT_UPDATE_PROGRESS)
if nt.Notifier.ANY in proxy_event_types:
# NOTE(harlowja): If ANY is present, just have it be
# the **only** event registered, as all other events will be
# sent if ANY is registered (due to the nature of ANY sending
# all the things); if we also include the other event types
# in this set if ANY is present we will receive duplicate
# messages in this process (the one where the local
# task callbacks are being triggered). For example the
# emissions of the tasks notifier (that is running out
# of process) will for specific events send messages for
# its ANY event type **and** the specific event
# type (2 messages, when we just want one) which will
# cause > 1 notify() call on the local tasks notifier, which
# causes more local callback triggering than we want
# to actually happen.
proxy_event_types = set([nt.Notifier.ANY])
if proxy_event_types:
# This sender acts as our forwarding proxy target, it
# will be sent pickled to the process that will execute
# the needed task and it will do the work of using the
# channel object to send back messages to this process for
# dispatch into the local task.
sender = EventSender(channel)
for event_type in needed:
for event_type in proxy_event_types:
clone.notifier.register(event_type, sender)
return bool(proxy_event_types)
def register():
if progress_callback is not None:
@ -698,14 +721,17 @@ class ParallelProcessTaskExecutor(base.ParallelTaskExecutor):
progress_callback)
self._dispatcher.targets.pop(identity, None)
rebind_task()
register()
should_register = rebind_task()
if should_register:
register()
try:
fut = self._executor.submit(func, clone, *args, **kwargs)
except RuntimeError:
with excutils.save_and_reraise_exception():
deregister()
if should_register:
deregister()
fut.atom = task
fut.add_done_callback(deregister)
if should_register:
fut.add_done_callback(deregister)
return fut

View File

@ -1527,6 +1527,48 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
max_workers=self._EXECUTOR_WORKERS,
**kwargs)
def test_update_progress_notifications_proxied(self):
captured = collections.defaultdict(list)
def notify_me(event_type, details):
captured[event_type].append(details)
a = utils.MultiProgressingTask('a')
a.notifier.register(a.notifier.ANY, notify_me)
progress_chunks = list(x / 10.0 for x in range(1, 10))
e = self._make_engine(a, store={'progress_chunks': progress_chunks})
e.run()
self.assertEqual(11, len(captured[task.EVENT_UPDATE_PROGRESS]))
def test_custom_notifications_proxied(self):
captured = collections.defaultdict(list)
def notify_me(event_type, details):
captured[event_type].append(details)
a = utils.EmittingTask('a')
a.notifier.register(a.notifier.ANY, notify_me)
e = self._make_engine(a)
e.run()
self.assertEqual(1, len(captured['hi']))
self.assertEqual(2, len(captured[task.EVENT_UPDATE_PROGRESS]))
def test_just_custom_notifications_proxied(self):
captured = collections.defaultdict(list)
def notify_me(event_type, details):
captured[event_type].append(details)
a = utils.EmittingTask('a')
a.notifier.register('hi', notify_me)
e = self._make_engine(a)
e.run()
self.assertEqual(1, len(captured['hi']))
self.assertEqual(0, len(captured[task.EVENT_UPDATE_PROGRESS]))
class WorkerBasedEngineTest(EngineTaskTest,
EngineMultipleResultsTest,

View File

@ -19,6 +19,7 @@ import string
import threading
import time
from oslo_utils import timeutils
import redis
import six
@ -104,6 +105,15 @@ class DummyTask(task.Task):
pass
class EmittingTask(task.Task):
TASK_EVENTS = (task.EVENT_UPDATE_PROGRESS, 'hi')
def execute(self, *args, **kwargs):
self.notifier.notify('hi',
details={'sent_on': timeutils.utcnow(),
'args': args, 'kwargs': kwargs})
class AddOneSameProvidesRequires(task.Task):
default_provides = 'value'