diff --git a/taskflow/engines/worker_based/dispatcher.py b/taskflow/engines/worker_based/dispatcher.py index 4983f6d5..9ff8ac10 100644 --- a/taskflow/engines/worker_based/dispatcher.py +++ b/taskflow/engines/worker_based/dispatcher.py @@ -19,6 +19,8 @@ import logging from kombu import exceptions as kombu_exc import six +from taskflow import exceptions as excp + LOG = logging.getLogger(__name__) @@ -66,6 +68,32 @@ class TypeDispatcher(object): else: LOG.debug("AMQP message %r requeued.", message.delivery_tag) + def _process_message(self, data, message, message_type): + handler = self._handlers.get(message_type) + if handler is None: + message.reject_log_error(logger=LOG, + errors=(kombu_exc.MessageStateError,)) + LOG.warning("Unexpected message type: '%s' in message" + " %r", message_type, message.delivery_tag) + else: + if isinstance(handler, (tuple, list)): + handler, validator = handler + try: + validator(data) + except excp.InvalidFormat as e: + message.reject_log_error( + logger=LOG, errors=(kombu_exc.MessageStateError,)) + LOG.warn("Message: %r, '%s' was rejected due to it being" + " in an invalid format: %s", + message.delivery_tag, message_type, e) + return + message.ack_log_error(logger=LOG, + errors=(kombu_exc.MessageStateError,)) + if message.acknowledged: + LOG.debug("AMQP message %r acknowledged.", + message.delivery_tag) + handler(data, message) + def on_message(self, data, message): """This method is called on incoming messages.""" LOG.debug("Got message: %r", message.delivery_tag) @@ -74,23 +102,11 @@ class TypeDispatcher(object): errors=(kombu_exc.MessageStateError,)) else: try: - msg_type = message.properties['type'] + message_type = message.properties['type'] except KeyError: message.reject_log_error( logger=LOG, errors=(kombu_exc.MessageStateError,)) LOG.warning("The 'type' message property is missing" " in message %r", message.delivery_tag) else: - handler = self._handlers.get(msg_type) - if handler is None: - message.reject_log_error( - logger=LOG, errors=(kombu_exc.MessageStateError,)) - LOG.warning("Unexpected message type: '%s' in message" - " %r", msg_type, message.delivery_tag) - else: - message.ack_log_error( - logger=LOG, errors=(kombu_exc.MessageStateError,)) - if message.acknowledged: - LOG.debug("AMQP message %r acknowledged.", - message.delivery_tag) - handler(data, message) + self._process_message(data, message, message_type) diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 55c889ca..e5b9a9c2 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -129,6 +129,10 @@ class MultipleChoices(TaskFlowException): """Raised when some decision can't be made due to many possible choices.""" +class InvalidFormat(TaskFlowException): + """Raised when some object/entity is not in the expected format.""" + + # Others. class WrappedFailure(Exception):