Merge "Make the dispatcher handler be an actual type"

This commit is contained in:
Jenkins
2015-02-20 05:52:21 +00:00
committed by Gerrit Code Review
7 changed files with 58 additions and 27 deletions

View File

@@ -23,6 +23,36 @@ from taskflow.utils import kombu_utils as ku
LOG = logging.getLogger(__name__)
class Handler(object):
"""Component(s) that will be called on reception of messages."""
__slots__ = ['_process_message', '_validator']
def __init__(self, process_message, validator=None):
self._process_message = process_message
self._validator = validator
@property
def process_message(self):
"""Main callback that is called to process a received message.
This is only called after the format has been validated (using
the ``validator`` callback if applicable) and only after the message
has been acknowledged.
"""
return self._process_message
@property
def validator(self):
"""Optional callback that will be activated before processing.
This callback if present is expected to validate the message and
raise :py:class:`~taskflow.exceptions.InvalidFormat` if the message
is not valid.
"""
return self._validator
class TypeDispatcher(object):
"""Receives messages and dispatches to type specific handlers."""
@@ -99,10 +129,9 @@ class TypeDispatcher(object):
LOG.warning("Unexpected message type: '%s' in message"
" '%s'", message_type, ku.DelayedPretty(message))
else:
if isinstance(handler, (tuple, list)):
handler, validator = handler
if handler.validator is not None:
try:
validator(data)
handler.validator(data)
except excp.InvalidFormat as e:
message.reject_log_error(
logger=LOG, errors=(kombu_exc.MessageStateError,))
@@ -115,7 +144,7 @@ class TypeDispatcher(object):
if message.acknowledged:
LOG.debug("Message '%s' was acknowledged.",
ku.DelayedPretty(message))
handler(data, message)
handler.process_message(data, message)
else:
message.reject_log_error(logger=LOG,
errors=(kombu_exc.MessageStateError,))

View File

@@ -36,8 +36,9 @@ class WorkerBasedActionEngine(engine.ActionEngine):
of the (PENDING, WAITING) request states. When
expired the associated task the request was made
for will have its result become a
`RequestTimeout` exception instead of its
normally returned value (or raised exception).
:py:class:`~taskflow.exceptions.RequestTimeout`
exception instead of its normally returned
value (or raised exception).
:param transport_options: transport specific options (see:
http://kombu.readthedocs.org/ for what these
options imply and are expected to be)

View File

@@ -19,6 +19,7 @@ import functools
from oslo_utils import timeutils
from taskflow.engines.action_engine import executor
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow.engines.worker_based import types as wt
@@ -44,10 +45,8 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._requests_cache = wt.RequestsCache()
self._transition_timeout = transition_timeout
type_handlers = {
pr.RESPONSE: [
self._process_response,
pr.Response.validate,
],
pr.RESPONSE: dispatcher.Handler(self._process_response,
validator=pr.Response.validate),
}
self._proxy = proxy.Proxy(uuid, exchange,
type_handlers=type_handlers,

View File

@@ -19,6 +19,7 @@ import functools
from oslo_utils import reflection
import six
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow import logging
@@ -38,14 +39,13 @@ class Server(object):
url=None, transport=None, transport_options=None,
retry_options=None):
type_handlers = {
pr.NOTIFY: [
pr.NOTIFY: dispatcher.Handler(
self._delayed_process(self._process_notify),
functools.partial(pr.Notify.validate, response=False),
],
pr.REQUEST: [
validator=functools.partial(pr.Notify.validate,
response=False)),
pr.REQUEST: dispatcher.Handler(
self._delayed_process(self._process_request),
pr.Request.validate,
],
validator=pr.Request.validate),
}
self._executor = executor
self._proxy = proxy.Proxy(topic, exchange,

View File

@@ -23,6 +23,7 @@ import threading
from oslo_utils import reflection
import six
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow import logging
from taskflow.types import cache as base
@@ -165,10 +166,10 @@ class ProxyWorkerFinder(WorkerFinder):
self._workers = {}
self._uuid = uuid
self._proxy.dispatcher.type_handlers.update({
pr.NOTIFY: [
pr.NOTIFY: dispatcher.Handler(
self._process_response,
functools.partial(pr.Notify.validate, response=True),
],
validator=functools.partial(pr.Notify.validate,
response=True)),
})
self._counter = itertools.count()

View File

@@ -40,12 +40,12 @@ def mock_acked_message(ack_ok=True, **kwargs):
class TestDispatcher(test.TestCase):
def test_creation(self):
on_hello = mock.MagicMock()
handlers = {'hello': on_hello}
handlers = {'hello': dispatcher.Handler(on_hello)}
dispatcher.TypeDispatcher(type_handlers=handlers)
def test_on_message(self):
on_hello = mock.MagicMock()
handlers = {'hello': on_hello}
handlers = {'hello': dispatcher.Handler(on_hello)}
d = dispatcher.TypeDispatcher(type_handlers=handlers)
msg = mock_acked_message(properties={'type': 'hello'})
d.on_message("", msg)
@@ -70,7 +70,7 @@ class TestDispatcher(test.TestCase):
def test_failed_ack(self):
on_hello = mock.MagicMock()
handlers = {'hello': on_hello}
handlers = {'hello': dispatcher.Handler(on_hello)}
d = dispatcher.TypeDispatcher(type_handlers=handlers)
msg = mock_acked_message(ack_ok=False,
properties={'type': 'hello'})

View File

@@ -16,6 +16,7 @@
from oslo_utils import uuidutils
from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow import test
@@ -35,7 +36,7 @@ class TestMessagePump(test.TestCase):
on_notify = mock.MagicMock()
on_notify.side_effect = lambda *args, **kwargs: barrier.set()
handlers = {pr.NOTIFY: on_notify}
handlers = {pr.NOTIFY: dispatcher.Handler(on_notify)}
p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers,
transport='memory',
transport_options={
@@ -60,7 +61,7 @@ class TestMessagePump(test.TestCase):
on_response = mock.MagicMock()
on_response.side_effect = lambda *args, **kwargs: barrier.set()
handlers = {pr.RESPONSE: on_response}
handlers = {pr.RESPONSE: dispatcher.Handler(on_response)}
p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers,
transport='memory',
transport_options={
@@ -96,9 +97,9 @@ class TestMessagePump(test.TestCase):
on_request.side_effect = countdown
handlers = {
pr.NOTIFY: on_notify,
pr.RESPONSE: on_response,
pr.REQUEST: on_request,
pr.NOTIFY: dispatcher.Handler(on_notify),
pr.RESPONSE: dispatcher.Handler(on_response),
pr.REQUEST: dispatcher.Handler(on_request),
}
p = proxy.Proxy(TEST_TOPIC, TEST_EXCHANGE, handlers,
transport='memory',