Add and use a nicer kombu message formatter

Since the kombu message object that is recieved has
no useful __str__ or __repr__ and on reception and processing
we should the message (and the delivery tag) it is nice to have
a more useful formatting of that message for debugging and
such (where more detail about the messages are quite useful
to see).

Change-Id: I6730b10122a5de1a0a074525931f312fbd97b8c0
This commit is contained in:
Joshua Harlow 2015-01-26 13:56:34 -08:00 committed by Joshua Harlow
parent 5f71412402
commit 99b92aed5d
7 changed files with 126 additions and 42 deletions

View File

@ -28,6 +28,11 @@ Kazoo
.. automodule:: taskflow.utils.kazoo_utils .. automodule:: taskflow.utils.kazoo_utils
Kombu
~~~~~
.. automodule:: taskflow.utils.kombu_utils
Locks Locks
~~~~~ ~~~~~

View File

@ -19,6 +19,7 @@ import six
from taskflow import exceptions as excp from taskflow import exceptions as excp
from taskflow import logging from taskflow import logging
from taskflow.utils import kombu_utils as ku
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -70,7 +71,7 @@ class TypeDispatcher(object):
LOG.critical("Couldn't requeue %r, reason:%r", LOG.critical("Couldn't requeue %r, reason:%r",
message.delivery_tag, exc, exc_info=True) message.delivery_tag, exc, exc_info=True)
else: else:
LOG.debug("AMQP message %r requeued.", message.delivery_tag) LOG.debug("Message '%s' was requeued.", ku.DelayedPretty(message))
def _process_message(self, data, message, message_type): def _process_message(self, data, message, message_type):
handler = self._handlers.get(message_type) handler = self._handlers.get(message_type)
@ -78,7 +79,7 @@ class TypeDispatcher(object):
message.reject_log_error(logger=LOG, message.reject_log_error(logger=LOG,
errors=(kombu_exc.MessageStateError,)) errors=(kombu_exc.MessageStateError,))
LOG.warning("Unexpected message type: '%s' in message" LOG.warning("Unexpected message type: '%s' in message"
" %r", message_type, message.delivery_tag) " '%s'", message_type, ku.DelayedPretty(message))
else: else:
if isinstance(handler, (tuple, list)): if isinstance(handler, (tuple, list)):
handler, validator = handler handler, validator = handler
@ -87,15 +88,15 @@ class TypeDispatcher(object):
except excp.InvalidFormat as e: except excp.InvalidFormat as e:
message.reject_log_error( message.reject_log_error(
logger=LOG, errors=(kombu_exc.MessageStateError,)) logger=LOG, errors=(kombu_exc.MessageStateError,))
LOG.warn("Message: %r, '%s' was rejected due to it being" LOG.warn("Message '%s' (%s) was rejected due to it being"
" in an invalid format: %s", " in an invalid format: %s",
message.delivery_tag, message_type, e) ku.DelayedPretty(message), message_type, e)
return return
message.ack_log_error(logger=LOG, message.ack_log_error(logger=LOG,
errors=(kombu_exc.MessageStateError,)) errors=(kombu_exc.MessageStateError,))
if message.acknowledged: if message.acknowledged:
LOG.debug("AMQP message %r acknowledged.", LOG.debug("Message '%s' was acknowledged.",
message.delivery_tag) ku.DelayedPretty(message))
handler(data, message) handler(data, message)
else: else:
message.reject_log_error(logger=LOG, message.reject_log_error(logger=LOG,
@ -103,7 +104,7 @@ class TypeDispatcher(object):
def on_message(self, data, message): def on_message(self, data, message):
"""This method is called on incoming messages.""" """This method is called on incoming messages."""
LOG.debug("Got message: %r", message.delivery_tag) LOG.debug("Received message '%s'", ku.DelayedPretty(message))
if self._collect_requeue_votes(data, message): if self._collect_requeue_votes(data, message):
self._requeue_log_error(message, self._requeue_log_error(message,
errors=(kombu_exc.MessageStateError,)) errors=(kombu_exc.MessageStateError,))
@ -114,6 +115,6 @@ class TypeDispatcher(object):
message.reject_log_error( message.reject_log_error(
logger=LOG, errors=(kombu_exc.MessageStateError,)) logger=LOG, errors=(kombu_exc.MessageStateError,))
LOG.warning("The 'type' message property is missing" LOG.warning("The 'type' message property is missing"
" in message %r", message.delivery_tag) " in message '%s'", ku.DelayedPretty(message))
else: else:
self._process_message(data, message, message_type) self._process_message(data, message, message_type)

View File

@ -26,6 +26,7 @@ from taskflow import exceptions as exc
from taskflow import logging from taskflow import logging
from taskflow import task as task_atom from taskflow import task as task_atom
from taskflow.types import timing as tt from taskflow.types import timing as tt
from taskflow.utils import kombu_utils as ku
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import threading_utils as tu from taskflow.utils import threading_utils as tu
@ -73,7 +74,8 @@ class WorkerTaskExecutor(executor.TaskExecutor):
def _process_notify(self, notify, message): def _process_notify(self, notify, message):
"""Process notify message from remote side.""" """Process notify message from remote side."""
LOG.debug("Started processing notify message '%s'", LOG.debug("Started processing notify message '%s'",
message.delivery_tag) ku.DelayedPretty(message))
topic = notify['topic'] topic = notify['topic']
tasks = notify['tasks'] tasks = notify['tasks']
@ -91,11 +93,13 @@ class WorkerTaskExecutor(executor.TaskExecutor):
def _process_response(self, response, message): def _process_response(self, response, message):
"""Process response from remote side.""" """Process response from remote side."""
LOG.debug("Started processing response message '%s'", LOG.debug("Started processing response message '%s'",
message.delivery_tag) ku.DelayedPretty(message))
try: try:
task_uuid = message.properties['correlation_id'] task_uuid = message.properties['correlation_id']
except KeyError: except KeyError:
LOG.warning("The 'correlation_id' message property is missing") LOG.warning("The 'correlation_id' message property is"
" missing in message '%s'",
ku.DelayedPretty(message))
else: else:
request = self._requests_cache.get(task_uuid) request = self._requests_cache.get(task_uuid)
if request is not None: if request is not None:

View File

@ -176,7 +176,8 @@ class Proxy(object):
LOG.exception('Publishing error: %s', exc) LOG.exception('Publishing error: %s', exc)
LOG.info('Retry triggering in %s seconds', interval) LOG.info('Retry triggering in %s seconds', interval)
LOG.debug("Sending '%s' using routing keys %s", msg, routing_keys) LOG.debug("Sending '%s' message using routing keys %s",
msg, routing_keys)
with kombu.connections[self._conn].acquire(block=True) as conn: with kombu.connections[self._conn].acquire(block=True) as conn:
with conn.Producer() as producer: with conn.Producer() as producer:
ensure_kwargs = self._ensure_options.copy() ensure_kwargs = self._ensure_options.copy()

View File

@ -23,6 +23,7 @@ from taskflow.engines.worker_based import proxy
from taskflow import logging from taskflow import logging
from taskflow.types import failure as ft from taskflow.types import failure as ft
from taskflow.types import notifier as nt from taskflow.types import notifier as nt
from taskflow.utils import kombu_utils as ku
from taskflow.utils import misc from taskflow.utils import misc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -142,13 +143,14 @@ class Server(object):
def _process_notify(self, notify, message): def _process_notify(self, notify, message):
"""Process notify message and reply back.""" """Process notify message and reply back."""
LOG.debug("Started processing notify message %r", message.delivery_tag) LOG.debug("Started processing notify message '%s'",
ku.DelayedPretty(message))
try: try:
reply_to = message.properties['reply_to'] reply_to = message.properties['reply_to']
except KeyError: except KeyError:
LOG.warn("The 'reply_to' message property is missing" LOG.warn("The 'reply_to' message property is missing"
" in received notify message %r", message.delivery_tag, " in received notify message '%s'",
exc_info=True) ku.DelayedPretty(message), exc_info=True)
else: else:
response = pr.Notify(topic=self._topic, response = pr.Notify(topic=self._topic,
tasks=self._endpoints.keys()) tasks=self._endpoints.keys())
@ -156,13 +158,13 @@ class Server(object):
self._proxy.publish(response, routing_key=reply_to) self._proxy.publish(response, routing_key=reply_to)
except Exception: except Exception:
LOG.critical("Failed to send reply to '%s' with notify" LOG.critical("Failed to send reply to '%s' with notify"
" response %s", reply_to, response, " response '%s'", reply_to, response,
exc_info=True) exc_info=True)
def _process_request(self, request, message): def _process_request(self, request, message):
"""Process request message and reply back.""" """Process request message and reply back."""
LOG.debug("Started processing request message %r", LOG.debug("Started processing request message '%s'",
message.delivery_tag) ku.DelayedPretty(message))
try: try:
# NOTE(skudriashev): parse broker message first to get # NOTE(skudriashev): parse broker message first to get
# the `reply_to` and the `task_uuid` parameters to have # the `reply_to` and the `task_uuid` parameters to have
@ -170,8 +172,8 @@ class Server(object):
# in the first place...). # in the first place...).
reply_to, task_uuid = self._parse_message(message) reply_to, task_uuid = self._parse_message(message)
except ValueError: except ValueError:
LOG.warn("Failed to parse request attributes from message %r", LOG.warn("Failed to parse request attributes from message '%s'",
message.delivery_tag, exc_info=True) ku.DelayedPretty(message), exc_info=True)
return return
else: else:
# prepare reply callback # prepare reply callback
@ -185,8 +187,8 @@ class Server(object):
arguments['task_uuid'] = task_uuid arguments['task_uuid'] = task_uuid
except ValueError: except ValueError:
with misc.capture_failure() as failure: with misc.capture_failure() as failure:
LOG.warn("Failed to parse request contents from message %r", LOG.warn("Failed to parse request contents from message '%s'",
message.delivery_tag, exc_info=True) ku.DelayedPretty(message), exc_info=True)
reply_callback(result=failure.to_dict()) reply_callback(result=failure.to_dict())
return return
@ -196,8 +198,8 @@ class Server(object):
except KeyError: except KeyError:
with misc.capture_failure() as failure: with misc.capture_failure() as failure:
LOG.warn("The '%s' task endpoint does not exist, unable" LOG.warn("The '%s' task endpoint does not exist, unable"
" to continue processing request message %r", " to continue processing request message '%s'",
task_cls, message.delivery_tag, exc_info=True) task_cls, ku.DelayedPretty(message), exc_info=True)
reply_callback(result=failure.to_dict()) reply_callback(result=failure.to_dict())
return return
else: else:
@ -207,8 +209,8 @@ class Server(object):
with misc.capture_failure() as failure: with misc.capture_failure() as failure:
LOG.warn("The '%s' handler does not exist on task endpoint" LOG.warn("The '%s' handler does not exist on task endpoint"
" '%s', unable to continue processing request" " '%s', unable to continue processing request"
" message %r", action, endpoint, " message '%s'", action, endpoint,
message.delivery_tag, exc_info=True) ku.DelayedPretty(message), exc_info=True)
reply_callback(result=failure.to_dict()) reply_callback(result=failure.to_dict())
return return
else: else:
@ -217,8 +219,8 @@ class Server(object):
except Exception: except Exception:
with misc.capture_failure() as failure: with misc.capture_failure() as failure:
LOG.warn("The '%s' task '%s' generation for request" LOG.warn("The '%s' task '%s' generation for request"
" message %r failed", endpoint, action, " message '%s' failed", endpoint, action,
message.delivery_tag, exc_info=True) ku.DelayedPretty(message), exc_info=True)
reply_callback(result=failure.to_dict()) reply_callback(result=failure.to_dict())
return return
else: else:
@ -245,8 +247,8 @@ class Server(object):
except Exception: except Exception:
with misc.capture_failure() as failure: with misc.capture_failure() as failure:
LOG.warn("The '%s' endpoint '%s' execution for request" LOG.warn("The '%s' endpoint '%s' execution for request"
" message %r failed", endpoint, action, " message '%s' failed", endpoint, action,
message.delivery_tag, exc_info=True) ku.DelayedPretty(message), exc_info=True)
reply_callback(result=failure.to_dict()) reply_callback(result=failure.to_dict())
else: else:
if isinstance(result, ft.Failure): if isinstance(result, ft.Failure):

View File

@ -154,7 +154,7 @@ class TestServer(test.MockTestCase):
s = self.server(reset_master_mock=True) s = self.server(reset_master_mock=True)
s._reply(True, self.reply_to, self.task_uuid) s._reply(True, self.reply_to, self.task_uuid)
self.assertEqual(self.master_mock.mock_calls, [ self.master_mock.assert_has_calls([
mock.call.Response(pr.FAILURE), mock.call.Response(pr.FAILURE),
mock.call.proxy.publish(self.response_inst_mock, self.reply_to, mock.call.proxy.publish(self.response_inst_mock, self.reply_to,
correlation_id=self.task_uuid) correlation_id=self.task_uuid)
@ -195,7 +195,7 @@ class TestServer(test.MockTestCase):
mock.call.proxy.publish(self.response_inst_mock, self.reply_to, mock.call.proxy.publish(self.response_inst_mock, self.reply_to,
correlation_id=self.task_uuid) correlation_id=self.task_uuid)
] ]
self.assertEqual(master_mock_calls, self.master_mock.mock_calls) self.master_mock.assert_has_calls(master_mock_calls)
def test_process_request(self): def test_process_request(self):
# create server and process request # create server and process request
@ -211,7 +211,7 @@ class TestServer(test.MockTestCase):
mock.call.proxy.publish(self.response_inst_mock, self.reply_to, mock.call.proxy.publish(self.response_inst_mock, self.reply_to,
correlation_id=self.task_uuid) correlation_id=self.task_uuid)
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.master_mock.assert_has_calls(master_mock_calls)
@mock.patch("taskflow.engines.worker_based.server.LOG.warn") @mock.patch("taskflow.engines.worker_based.server.LOG.warn")
def test_process_request_parse_message_failure(self, mocked_exception): def test_process_request_parse_message_failure(self, mocked_exception):
@ -219,8 +219,6 @@ class TestServer(test.MockTestCase):
request = self.make_request() request = self.make_request()
s = self.server(reset_master_mock=True) s = self.server(reset_master_mock=True)
s._process_request(request, self.message_mock) s._process_request(request, self.message_mock)
self.assertEqual(self.master_mock.mock_calls, [])
self.assertTrue(mocked_exception.called) self.assertTrue(mocked_exception.called)
@mock.patch.object(failure.Failure, 'from_dict') @mock.patch.object(failure.Failure, 'from_dict')
@ -245,7 +243,7 @@ class TestServer(test.MockTestCase):
self.reply_to, self.reply_to,
correlation_id=self.task_uuid) correlation_id=self.task_uuid)
] ]
self.assertEqual(master_mock_calls, self.master_mock.mock_calls) self.master_mock.assert_has_calls(master_mock_calls)
@mock.patch.object(failure.Failure, 'to_dict') @mock.patch.object(failure.Failure, 'to_dict')
def test_process_request_endpoint_not_found(self, to_mock): def test_process_request_endpoint_not_found(self, to_mock):
@ -266,7 +264,7 @@ class TestServer(test.MockTestCase):
self.reply_to, self.reply_to,
correlation_id=self.task_uuid) correlation_id=self.task_uuid)
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.master_mock.assert_has_calls(master_mock_calls)
@mock.patch.object(failure.Failure, 'to_dict') @mock.patch.object(failure.Failure, 'to_dict')
def test_process_request_execution_failure(self, to_mock): def test_process_request_execution_failure(self, to_mock):
@ -288,7 +286,7 @@ class TestServer(test.MockTestCase):
self.reply_to, self.reply_to,
correlation_id=self.task_uuid) correlation_id=self.task_uuid)
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.master_mock.assert_has_calls(master_mock_calls)
@mock.patch.object(failure.Failure, 'to_dict') @mock.patch.object(failure.Failure, 'to_dict')
def test_process_request_task_failure(self, to_mock): def test_process_request_task_failure(self, to_mock):
@ -312,7 +310,7 @@ class TestServer(test.MockTestCase):
self.reply_to, self.reply_to,
correlation_id=self.task_uuid) correlation_id=self.task_uuid)
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.master_mock.assert_has_calls(master_mock_calls)
def test_start(self): def test_start(self):
self.server(reset_master_mock=True).start() self.server(reset_master_mock=True).start()
@ -321,7 +319,7 @@ class TestServer(test.MockTestCase):
master_mock_calls = [ master_mock_calls = [
mock.call.proxy.start() mock.call.proxy.start()
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.master_mock.assert_has_calls(master_mock_calls)
def test_wait(self): def test_wait(self):
server = self.server(reset_master_mock=True) server = self.server(reset_master_mock=True)
@ -333,7 +331,7 @@ class TestServer(test.MockTestCase):
mock.call.proxy.start(), mock.call.proxy.start(),
mock.call.proxy.wait() mock.call.proxy.wait()
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.master_mock.assert_has_calls(master_mock_calls)
def test_stop(self): def test_stop(self):
self.server(reset_master_mock=True).stop() self.server(reset_master_mock=True).stop()
@ -342,4 +340,4 @@ class TestServer(test.MockTestCase):
master_mock_calls = [ master_mock_calls = [
mock.call.proxy.stop() mock.call.proxy.stop()
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.master_mock.assert_has_calls(master_mock_calls)

View File

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Keys extracted from the message properties when formatting...
_MSG_PROPERTIES = tuple([
'correlation_id',
'delivery_info/routing_key',
'type',
])
class DelayedPretty(object):
"""Wraps a message and delays prettifying it until requested."""
def __init__(self, message):
self._message = message
self._message_pretty = None
def __str__(self):
if self._message_pretty is None:
self._message_pretty = _prettify_message(self._message)
return self._message_pretty
def _get_deep(properties, *keys):
"""Get a final key among a list of keys (each with its own sub-dict)."""
for key in keys:
properties = properties[key]
return properties
def _prettify_message(message):
"""Kombu doesn't currently have a useful ``__str__()`` or ``__repr__()``.
This provides something decent(ish) for debugging (or other purposes) so
that messages are more nice and understandable....
TODO(harlowja): submit something into kombu to fix/adjust this.
"""
if message.content_type is not None:
properties = {
'content_type': message.content_type,
}
else:
properties = {}
for name in _MSG_PROPERTIES:
segments = name.split("/")
try:
value = _get_deep(message.properties, *segments)
except (KeyError, ValueError, TypeError):
pass
else:
if value is not None:
properties[segments[-1]] = value
if message.body is not None:
properties['body_length'] = len(message.body)
return "%(delivery_tag)s: %(properties)s" % {
'delivery_tag': message.delivery_tag,
'properties': properties,
}