diff --git a/taskflow/engines/worker_based/dispatcher.py b/taskflow/engines/worker_based/dispatcher.py index 13470e08..27350731 100644 --- a/taskflow/engines/worker_based/dispatcher.py +++ b/taskflow/engines/worker_based/dispatcher.py @@ -23,6 +23,36 @@ from taskflow.utils import kombu_utils as ku LOG = logging.getLogger(__name__) +class Handler(object): + """Component(s) that will be called on reception of messages.""" + + __slots__ = ['_process_message', '_validator'] + + def __init__(self, process_message, validator=None): + self._process_message = process_message + self._validator = validator + + @property + def process_message(self): + """Main callback that is called to process a received message. + + This is only called after the format has been validated (using + the ``validator`` callback if applicable) and only after the message + has been acknowledged. + """ + return self._process_message + + @property + def validator(self): + """Optional callback that will be activated before processing. + + This callback if present is expected to validate the message and + raise :py:class:`~taskflow.exceptions.InvalidFormat` if the message + is not valid. + """ + return self._validator + + class TypeDispatcher(object): """Receives messages and dispatches to type specific handlers.""" @@ -99,10 +129,9 @@ class TypeDispatcher(object): LOG.warning("Unexpected message type: '%s' in message" " '%s'", message_type, ku.DelayedPretty(message)) else: - if isinstance(handler, (tuple, list)): - handler, validator = handler + if handler.validator is not None: try: - validator(data) + handler.validator(data) except excp.InvalidFormat as e: message.reject_log_error( logger=LOG, errors=(kombu_exc.MessageStateError,)) @@ -115,7 +144,7 @@ class TypeDispatcher(object): if message.acknowledged: LOG.debug("Message '%s' was acknowledged.", ku.DelayedPretty(message)) - handler(data, message) + handler.process_message(data, message) else: message.reject_log_error(logger=LOG, errors=(kombu_exc.MessageStateError,)) diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index 31dcade1..a22a5d9f 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -36,8 +36,9 @@ class WorkerBasedActionEngine(engine.ActionEngine): of the (PENDING, WAITING) request states. When expired the associated task the request was made for will have its result become a - `RequestTimeout` exception instead of its - normally returned value (or raised exception). + :py:class:`~taskflow.exceptions.RequestTimeout` + exception instead of its normally returned + value (or raised exception). :param transport_options: transport specific options (see: http://kombu.readthedocs.org/ for what these options imply and are expected to be) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 7722432f..b5b7d39b 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -19,6 +19,7 @@ import functools from oslo_utils import timeutils from taskflow.engines.action_engine import executor +from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow.engines.worker_based import types as wt @@ -44,10 +45,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): self._requests_cache = wt.RequestsCache() self._transition_timeout = transition_timeout type_handlers = { - pr.RESPONSE: [ - self._process_response, - pr.Response.validate, - ], + pr.RESPONSE: dispatcher.Handler(self._process_response, + validator=pr.Response.validate), } self._proxy = proxy.Proxy(uuid, exchange, type_handlers=type_handlers, diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 1c9a605f..1043e879 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -19,6 +19,7 @@ import functools from oslo_utils import reflection import six +from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow import logging @@ -38,14 +39,13 @@ class Server(object): url=None, transport=None, transport_options=None, retry_options=None): type_handlers = { - pr.NOTIFY: [ + pr.NOTIFY: dispatcher.Handler( self._delayed_process(self._process_notify), - functools.partial(pr.Notify.validate, response=False), - ], - pr.REQUEST: [ + validator=functools.partial(pr.Notify.validate, + response=False)), + pr.REQUEST: dispatcher.Handler( self._delayed_process(self._process_request), - pr.Request.validate, - ], + validator=pr.Request.validate), } self._executor = executor self._proxy = proxy.Proxy(topic, exchange, diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 3d53e87a..1ee8f4b8 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -23,6 +23,7 @@ import threading from oslo_utils import reflection import six +from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow import logging from taskflow.types import cache as base @@ -165,10 +166,10 @@ class ProxyWorkerFinder(WorkerFinder): self._workers = {} self._uuid = uuid self._proxy.dispatcher.type_handlers.update({ - pr.NOTIFY: [ + pr.NOTIFY: dispatcher.Handler( self._process_response, - functools.partial(pr.Notify.validate, response=True), - ], + validator=functools.partial(pr.Notify.validate, + response=True)), }) self._counter = itertools.count() diff --git a/taskflow/tests/unit/worker_based/test_dispatcher.py b/taskflow/tests/unit/worker_based/test_dispatcher.py index 21fccdcc..af97e485 100644 --- a/taskflow/tests/unit/worker_based/test_dispatcher.py +++ b/taskflow/tests/unit/worker_based/test_dispatcher.py @@ -40,12 +40,12 @@ def mock_acked_message(ack_ok=True, **kwargs): class TestDispatcher(test.TestCase): def test_creation(self): on_hello = mock.MagicMock() - handlers = {'hello': on_hello} + handlers = {'hello': dispatcher.Handler(on_hello)} dispatcher.TypeDispatcher(type_handlers=handlers) def test_on_message(self): on_hello = mock.MagicMock() - handlers = {'hello': on_hello} + handlers = {'hello': dispatcher.Handler(on_hello)} d = dispatcher.TypeDispatcher(type_handlers=handlers) msg = mock_acked_message(properties={'type': 'hello'}) d.on_message("", msg) @@ -70,7 +70,7 @@ class TestDispatcher(test.TestCase): def test_failed_ack(self): on_hello = mock.MagicMock() - handlers = {'hello': on_hello} + handlers = {'hello': dispatcher.Handler(on_hello)} d = dispatcher.TypeDispatcher(type_handlers=handlers) msg = mock_acked_message(ack_ok=False, properties={'type': 'hello'}) diff --git a/taskflow/tests/unit/worker_based/test_message_pump.py b/taskflow/tests/unit/worker_based/test_message_pump.py index d8438131..59bddbda 100644 --- a/taskflow/tests/unit/worker_based/test_message_pump.py +++ b/taskflow/tests/unit/worker_based/test_message_pump.py @@ -16,6 +16,7 @@ from oslo_utils import uuidutils +from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow import test @@ -35,7 +36,7 @@ class TestMessagePump(test.TestCase): on_notify = mock.MagicMock() on_notify.side_effect = lambda *args, **kwargs: barrier.set() - handlers = {pr.NOTIFY: on_notify} + handlers = {pr.NOTIFY: dispatcher.Handler(on_notify)} p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers, transport='memory', transport_options={ @@ -60,7 +61,7 @@ class TestMessagePump(test.TestCase): on_response = mock.MagicMock() on_response.side_effect = lambda *args, **kwargs: barrier.set() - handlers = {pr.RESPONSE: on_response} + handlers = {pr.RESPONSE: dispatcher.Handler(on_response)} p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers, transport='memory', transport_options={ @@ -96,9 +97,9 @@ class TestMessagePump(test.TestCase): on_request.side_effect = countdown handlers = { - pr.NOTIFY: on_notify, - pr.RESPONSE: on_response, - pr.REQUEST: on_request, + pr.NOTIFY: dispatcher.Handler(on_notify), + pr.RESPONSE: dispatcher.Handler(on_response), + pr.REQUEST: dispatcher.Handler(on_request), } p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers, transport='memory',