Make the dispatcher handler be an actual type

Instead of having the dispatcher target be a tuple
or a single callback, have it be an actual type with
comments as to what the types fields are and how they
are used.

This makes it more obvious as to what those fields are
and how they are used so that it is easier to understand
how the WBE engine and components work.

Change-Id: I5541ccd5a7aa6ae73ed9adceeac2c0524e2a1dc9
This commit is contained in:
Joshua Harlow
2015-02-15 20:03:53 -08:00
parent 14009d2334
commit d1b6776435
7 changed files with 58 additions and 27 deletions

View File

@@ -23,6 +23,36 @@ from taskflow.utils import kombu_utils as ku
LOG = logging.getLogger(__name__)
class Handler(object):
"""Component(s) that will be called on reception of messages."""
__slots__ = ['_process_message', '_validator']
def __init__(self, process_message, validator=None):
self._process_message = process_message
self._validator = validator
@property
def process_message(self):
"""Main callback that is called to process a received message.
This is only called after the format has been validated (using
the ``validator`` callback if applicable) and only after the message
has been acknowledged.
"""
return self._process_message
@property
def validator(self):
"""Optional callback that will be activated before processing.
This callback if present is expected to validate the message and
raise :py:class:`~taskflow.exceptions.InvalidFormat` if the message
is not valid.
"""
return self._validator
class TypeDispatcher(object):
"""Receives messages and dispatches to type specific handlers."""
@@ -99,10 +129,9 @@ class TypeDispatcher(object):
LOG.warning("Unexpected message type: '%s' in message"
" '%s'", message_type, ku.DelayedPretty(message))
else:
if isinstance(handler, (tuple, list)):
handler, validator = handler
if handler.validator is not None:
try:
validator(data)
handler.validator(data)
except excp.InvalidFormat as e:
message.reject_log_error(
logger=LOG, errors=(kombu_exc.MessageStateError,))
@@ -115,7 +144,7 @@ class TypeDispatcher(object):
if message.acknowledged:
LOG.debug("Message '%s' was acknowledged.",
ku.DelayedPretty(message))
handler(data, message)
handler.process_message(data, message)
else:
message.reject_log_error(logger=LOG,
errors=(kombu_exc.MessageStateError,))

View File

@@ -36,8 +36,9 @@ class WorkerBasedActionEngine(engine.ActionEngine):
of the (PENDING, WAITING) request states. When
expired the associated task the request was made
for will have its result become a
`RequestTimeout` exception instead of its
normally returned value (or raised exception).
:py:class:`~taskflow.exceptions.RequestTimeout`
exception instead of its normally returned
value (or raised exception).
:param transport_options: transport specific options (see:
http://kombu.readthedocs.org/ for what these
options imply and are expected to be)

View File

@@ -19,6 +19,7 @@ import functools
from oslo_utils import timeutils
from taskflow.engines.action_engine import executor
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow.engines.worker_based import types as wt
@@ -44,10 +45,8 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._requests_cache = wt.RequestsCache()
self._transition_timeout = transition_timeout
type_handlers = {
pr.RESPONSE: [
self._process_response,
pr.Response.validate,
],
pr.RESPONSE: dispatcher.Handler(self._process_response,
validator=pr.Response.validate),
}
self._proxy = proxy.Proxy(uuid, exchange,
type_handlers=type_handlers,

View File

@@ -19,6 +19,7 @@ import functools
from oslo_utils import reflection
import six
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow import logging
@@ -38,14 +39,13 @@ class Server(object):
url=None, transport=None, transport_options=None,
retry_options=None):
type_handlers = {
pr.NOTIFY: [
pr.NOTIFY: dispatcher.Handler(
self._delayed_process(self._process_notify),
functools.partial(pr.Notify.validate, response=False),
],
pr.REQUEST: [
validator=functools.partial(pr.Notify.validate,
response=False)),
pr.REQUEST: dispatcher.Handler(
self._delayed_process(self._process_request),
pr.Request.validate,
],
validator=pr.Request.validate),
}
self._executor = executor
self._proxy = proxy.Proxy(topic, exchange,

View File

@@ -23,6 +23,7 @@ import threading
from oslo_utils import reflection
import six
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow import logging
from taskflow.types import cache as base
@@ -165,10 +166,10 @@ class ProxyWorkerFinder(WorkerFinder):
self._workers = {}
self._uuid = uuid
self._proxy.dispatcher.type_handlers.update({
pr.NOTIFY: [
pr.NOTIFY: dispatcher.Handler(
self._process_response,
functools.partial(pr.Notify.validate, response=True),
],
validator=functools.partial(pr.Notify.validate,
response=True)),
})
self._counter = itertools.count()

View File

@@ -40,12 +40,12 @@ def mock_acked_message(ack_ok=True, **kwargs):
class TestDispatcher(test.TestCase):
def test_creation(self):
on_hello = mock.MagicMock()
handlers = {'hello': on_hello}
handlers = {'hello': dispatcher.Handler(on_hello)}
dispatcher.TypeDispatcher(type_handlers=handlers)
def test_on_message(self):
on_hello = mock.MagicMock()
handlers = {'hello': on_hello}
handlers = {'hello': dispatcher.Handler(on_hello)}
d = dispatcher.TypeDispatcher(type_handlers=handlers)
msg = mock_acked_message(properties={'type': 'hello'})
d.on_message("", msg)
@@ -70,7 +70,7 @@ class TestDispatcher(test.TestCase):
def test_failed_ack(self):
on_hello = mock.MagicMock()
handlers = {'hello': on_hello}
handlers = {'hello': dispatcher.Handler(on_hello)}
d = dispatcher.TypeDispatcher(type_handlers=handlers)
msg = mock_acked_message(ack_ok=False,
properties={'type': 'hello'})

View File

@@ -16,6 +16,7 @@
from oslo_utils import uuidutils
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow import test
@@ -35,7 +36,7 @@ class TestMessagePump(test.TestCase):
on_notify = mock.MagicMock()
on_notify.side_effect = lambda *args, **kwargs: barrier.set()
handlers = {pr.NOTIFY: on_notify}
handlers = {pr.NOTIFY: dispatcher.Handler(on_notify)}
p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers,
transport='memory',
transport_options={
@@ -60,7 +61,7 @@ class TestMessagePump(test.TestCase):
on_response = mock.MagicMock()
on_response.side_effect = lambda *args, **kwargs: barrier.set()
handlers = {pr.RESPONSE: on_response}
handlers = {pr.RESPONSE: dispatcher.Handler(on_response)}
p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers,
transport='memory',
transport_options={
@@ -96,9 +97,9 @@ class TestMessagePump(test.TestCase):
on_request.side_effect = countdown
handlers = {
pr.NOTIFY: on_notify,
pr.RESPONSE: on_response,
pr.REQUEST: on_request,
pr.NOTIFY: dispatcher.Handler(on_notify),
pr.RESPONSE: dispatcher.Handler(on_response),
pr.REQUEST: dispatcher.Handler(on_request),
}
p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers,
transport='memory',