From d1b6776435644330698722143e36c37b9c3ca375 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sun, 15 Feb 2015 20:03:53 -0800 Subject: [PATCH] Make the dispatcher handler be an actual type Instead of having the dispatcher target be a tuple or a single callback, have it be an actual type with comments as to what the types fields are and how they are used. This makes it more obvious as to what those fields are and how they are used so that it is easier to understand how the WBE engine and components work. Change-Id: I5541ccd5a7aa6ae73ed9adceeac2c0524e2a1dc9 --- taskflow/engines/worker_based/dispatcher.py | 37 +++++++++++++++++-- taskflow/engines/worker_based/engine.py | 5 ++- taskflow/engines/worker_based/executor.py | 7 ++-- taskflow/engines/worker_based/server.py | 12 +++--- taskflow/engines/worker_based/types.py | 7 ++-- .../unit/worker_based/test_dispatcher.py | 6 +-- .../unit/worker_based/test_message_pump.py | 11 +++--- 7 files changed, 58 insertions(+), 27 deletions(-) diff --git a/taskflow/engines/worker_based/dispatcher.py b/taskflow/engines/worker_based/dispatcher.py index 13470e08..27350731 100644 --- a/taskflow/engines/worker_based/dispatcher.py +++ b/taskflow/engines/worker_based/dispatcher.py @@ -23,6 +23,36 @@ from taskflow.utils import kombu_utils as ku LOG = logging.getLogger(__name__) +class Handler(object): + """Component(s) that will be called on reception of messages.""" + + __slots__ = ['_process_message', '_validator'] + + def __init__(self, process_message, validator=None): + self._process_message = process_message + self._validator = validator + + @property + def process_message(self): + """Main callback that is called to process a received message. + + This is only called after the format has been validated (using + the ``validator`` callback if applicable) and only after the message + has been acknowledged. + """ + return self._process_message + + @property + def validator(self): + """Optional callback that will be activated before processing. + + This callback if present is expected to validate the message and + raise :py:class:`~taskflow.exceptions.InvalidFormat` if the message + is not valid. + """ + return self._validator + + class TypeDispatcher(object): """Receives messages and dispatches to type specific handlers.""" @@ -99,10 +129,9 @@ class TypeDispatcher(object): LOG.warning("Unexpected message type: '%s' in message" " '%s'", message_type, ku.DelayedPretty(message)) else: - if isinstance(handler, (tuple, list)): - handler, validator = handler + if handler.validator is not None: try: - validator(data) + handler.validator(data) except excp.InvalidFormat as e: message.reject_log_error( logger=LOG, errors=(kombu_exc.MessageStateError,)) @@ -115,7 +144,7 @@ class TypeDispatcher(object): if message.acknowledged: LOG.debug("Message '%s' was acknowledged.", ku.DelayedPretty(message)) - handler(data, message) + handler.process_message(data, message) else: message.reject_log_error(logger=LOG, errors=(kombu_exc.MessageStateError,)) diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index 31dcade1..a22a5d9f 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -36,8 +36,9 @@ class WorkerBasedActionEngine(engine.ActionEngine): of the (PENDING, WAITING) request states. When expired the associated task the request was made for will have its result become a - `RequestTimeout` exception instead of its - normally returned value (or raised exception). + :py:class:`~taskflow.exceptions.RequestTimeout` + exception instead of its normally returned + value (or raised exception). :param transport_options: transport specific options (see: http://kombu.readthedocs.org/ for what these options imply and are expected to be) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 7722432f..b5b7d39b 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -19,6 +19,7 @@ import functools from oslo_utils import timeutils from taskflow.engines.action_engine import executor +from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow.engines.worker_based import types as wt @@ -44,10 +45,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): self._requests_cache = wt.RequestsCache() self._transition_timeout = transition_timeout type_handlers = { - pr.RESPONSE: [ - self._process_response, - pr.Response.validate, - ], + pr.RESPONSE: dispatcher.Handler(self._process_response, + validator=pr.Response.validate), } self._proxy = proxy.Proxy(uuid, exchange, type_handlers=type_handlers, diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 1c9a605f..1043e879 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -19,6 +19,7 @@ import functools from oslo_utils import reflection import six +from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow import logging @@ -38,14 +39,13 @@ class Server(object): url=None, transport=None, transport_options=None, retry_options=None): type_handlers = { - pr.NOTIFY: [ + pr.NOTIFY: dispatcher.Handler( self._delayed_process(self._process_notify), - functools.partial(pr.Notify.validate, response=False), - ], - pr.REQUEST: [ + validator=functools.partial(pr.Notify.validate, + response=False)), + pr.REQUEST: dispatcher.Handler( self._delayed_process(self._process_request), - pr.Request.validate, - ], + validator=pr.Request.validate), } self._executor = executor self._proxy = proxy.Proxy(topic, exchange, diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 3d53e87a..1ee8f4b8 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -23,6 +23,7 @@ import threading from oslo_utils import reflection import six +from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow import logging from taskflow.types import cache as base @@ -165,10 +166,10 @@ class ProxyWorkerFinder(WorkerFinder): self._workers = {} self._uuid = uuid self._proxy.dispatcher.type_handlers.update({ - pr.NOTIFY: [ + pr.NOTIFY: dispatcher.Handler( self._process_response, - functools.partial(pr.Notify.validate, response=True), - ], + validator=functools.partial(pr.Notify.validate, + response=True)), }) self._counter = itertools.count() diff --git a/taskflow/tests/unit/worker_based/test_dispatcher.py b/taskflow/tests/unit/worker_based/test_dispatcher.py index 21fccdcc..af97e485 100644 --- a/taskflow/tests/unit/worker_based/test_dispatcher.py +++ b/taskflow/tests/unit/worker_based/test_dispatcher.py @@ -40,12 +40,12 @@ def mock_acked_message(ack_ok=True, **kwargs): class TestDispatcher(test.TestCase): def test_creation(self): on_hello = mock.MagicMock() - handlers = {'hello': on_hello} + handlers = {'hello': dispatcher.Handler(on_hello)} dispatcher.TypeDispatcher(type_handlers=handlers) def test_on_message(self): on_hello = mock.MagicMock() - handlers = {'hello': on_hello} + handlers = {'hello': dispatcher.Handler(on_hello)} d = dispatcher.TypeDispatcher(type_handlers=handlers) msg = mock_acked_message(properties={'type': 'hello'}) d.on_message("", msg) @@ -70,7 +70,7 @@ class TestDispatcher(test.TestCase): def test_failed_ack(self): on_hello = mock.MagicMock() - handlers = {'hello': on_hello} + handlers = {'hello': dispatcher.Handler(on_hello)} d = dispatcher.TypeDispatcher(type_handlers=handlers) msg = mock_acked_message(ack_ok=False, properties={'type': 'hello'}) diff --git a/taskflow/tests/unit/worker_based/test_message_pump.py b/taskflow/tests/unit/worker_based/test_message_pump.py index d8438131..59bddbda 100644 --- a/taskflow/tests/unit/worker_based/test_message_pump.py +++ b/taskflow/tests/unit/worker_based/test_message_pump.py @@ -16,6 +16,7 @@ from oslo_utils import uuidutils +from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow import test @@ -35,7 +36,7 @@ class TestMessagePump(test.TestCase): on_notify = mock.MagicMock() on_notify.side_effect = lambda *args, **kwargs: barrier.set() - handlers = {pr.NOTIFY: on_notify} + handlers = {pr.NOTIFY: dispatcher.Handler(on_notify)} p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers, transport='memory', transport_options={ @@ -60,7 +61,7 @@ class TestMessagePump(test.TestCase): on_response = mock.MagicMock() on_response.side_effect = lambda *args, **kwargs: barrier.set() - handlers = {pr.RESPONSE: on_response} + handlers = {pr.RESPONSE: dispatcher.Handler(on_response)} p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers, transport='memory', transport_options={ @@ -96,9 +97,9 @@ class TestMessagePump(test.TestCase): on_request.side_effect = countdown handlers = { - pr.NOTIFY: on_notify, - pr.RESPONSE: on_response, - pr.REQUEST: on_request, + pr.NOTIFY: dispatcher.Handler(on_notify), + pr.RESPONSE: dispatcher.Handler(on_response), + pr.REQUEST: dispatcher.Handler(on_request), } p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers, transport='memory',