Allow handlers to provide validation callables
In order to reject messages before they are processed make it possible to provide a handler pair that will validate the incoming message and either reject or allow the message to be passed to the provided handler. Part of blueprint wbe-message-validation Change-Id: Ibd6ee40020c6b98283f40d5bd59e8950d63b7f71
This commit is contained in:
@@ -19,6 +19,8 @@ import logging
|
||||
from kombu import exceptions as kombu_exc
|
||||
import six
|
||||
|
||||
from taskflow import exceptions as excp
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -66,6 +68,32 @@ class TypeDispatcher(object):
|
||||
else:
|
||||
LOG.debug("AMQP message %r requeued.", message.delivery_tag)
|
||||
|
||||
def _process_message(self, data, message, message_type):
|
||||
handler = self._handlers.get(message_type)
|
||||
if handler is None:
|
||||
message.reject_log_error(logger=LOG,
|
||||
errors=(kombu_exc.MessageStateError,))
|
||||
LOG.warning("Unexpected message type: '%s' in message"
|
||||
" %r", message_type, message.delivery_tag)
|
||||
else:
|
||||
if isinstance(handler, (tuple, list)):
|
||||
handler, validator = handler
|
||||
try:
|
||||
validator(data)
|
||||
except excp.InvalidFormat as e:
|
||||
message.reject_log_error(
|
||||
logger=LOG, errors=(kombu_exc.MessageStateError,))
|
||||
LOG.warn("Message: %r, '%s' was rejected due to it being"
|
||||
" in an invalid format: %s",
|
||||
message.delivery_tag, message_type, e)
|
||||
return
|
||||
message.ack_log_error(logger=LOG,
|
||||
errors=(kombu_exc.MessageStateError,))
|
||||
if message.acknowledged:
|
||||
LOG.debug("AMQP message %r acknowledged.",
|
||||
message.delivery_tag)
|
||||
handler(data, message)
|
||||
|
||||
def on_message(self, data, message):
|
||||
"""This method is called on incoming messages."""
|
||||
LOG.debug("Got message: %r", message.delivery_tag)
|
||||
@@ -74,23 +102,11 @@ class TypeDispatcher(object):
|
||||
errors=(kombu_exc.MessageStateError,))
|
||||
else:
|
||||
try:
|
||||
msg_type = message.properties['type']
|
||||
message_type = message.properties['type']
|
||||
except KeyError:
|
||||
message.reject_log_error(
|
||||
logger=LOG, errors=(kombu_exc.MessageStateError,))
|
||||
LOG.warning("The 'type' message property is missing"
|
||||
" in message %r", message.delivery_tag)
|
||||
else:
|
||||
handler = self._handlers.get(msg_type)
|
||||
if handler is None:
|
||||
message.reject_log_error(
|
||||
logger=LOG, errors=(kombu_exc.MessageStateError,))
|
||||
LOG.warning("Unexpected message type: '%s' in message"
|
||||
" %r", msg_type, message.delivery_tag)
|
||||
else:
|
||||
message.ack_log_error(
|
||||
logger=LOG, errors=(kombu_exc.MessageStateError,))
|
||||
if message.acknowledged:
|
||||
LOG.debug("AMQP message %r acknowledged.",
|
||||
message.delivery_tag)
|
||||
handler(data, message)
|
||||
self._process_message(data, message, message_type)
|
||||
|
||||
@@ -129,6 +129,10 @@ class MultipleChoices(TaskFlowException):
|
||||
"""Raised when some decision can't be made due to many possible choices."""
|
||||
|
||||
|
||||
class InvalidFormat(TaskFlowException):
|
||||
"""Raised when some object/entity is not in the expected format."""
|
||||
|
||||
|
||||
# Others.
|
||||
|
||||
class WrappedFailure(Exception):
|
||||
|
||||
Reference in New Issue
Block a user