From 99b92aed5de1d0868266ce63dec4f3b0e4a636d1 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 26 Jan 2015 13:56:34 -0800 Subject: [PATCH] Add and use a nicer kombu message formatter Since the kombu message object that is recieved has no useful __str__ or __repr__ and on reception and processing we should the message (and the delivery tag) it is nice to have a more useful formatting of that message for debugging and such (where more detail about the messages are quite useful to see). Change-Id: I6730b10122a5de1a0a074525931f312fbd97b8c0 --- doc/source/utils.rst | 5 ++ taskflow/engines/worker_based/dispatcher.py | 17 +++-- taskflow/engines/worker_based/executor.py | 10 ++- taskflow/engines/worker_based/proxy.py | 3 +- taskflow/engines/worker_based/server.py | 38 +++++----- .../tests/unit/worker_based/test_server.py | 22 +++--- taskflow/utils/kombu_utils.py | 73 +++++++++++++++++++ 7 files changed, 126 insertions(+), 42 deletions(-) create mode 100644 taskflow/utils/kombu_utils.py diff --git a/doc/source/utils.rst b/doc/source/utils.rst index d3c726f0f..1f7746638 100644 --- a/doc/source/utils.rst +++ b/doc/source/utils.rst @@ -28,6 +28,11 @@ Kazoo .. automodule:: taskflow.utils.kazoo_utils +Kombu +~~~~~ + +.. automodule:: taskflow.utils.kombu_utils + Locks ~~~~~ diff --git a/taskflow/engines/worker_based/dispatcher.py b/taskflow/engines/worker_based/dispatcher.py index 7ea8947c4..385fd13d2 100644 --- a/taskflow/engines/worker_based/dispatcher.py +++ b/taskflow/engines/worker_based/dispatcher.py @@ -19,6 +19,7 @@ import six from taskflow import exceptions as excp from taskflow import logging +from taskflow.utils import kombu_utils as ku LOG = logging.getLogger(__name__) @@ -70,7 +71,7 @@ class TypeDispatcher(object): LOG.critical("Couldn't requeue %r, reason:%r", message.delivery_tag, exc, exc_info=True) else: - LOG.debug("AMQP message %r requeued.", message.delivery_tag) + LOG.debug("Message '%s' was requeued.", ku.DelayedPretty(message)) def _process_message(self, data, message, message_type): handler = self._handlers.get(message_type) @@ -78,7 +79,7 @@ class TypeDispatcher(object): message.reject_log_error(logger=LOG, errors=(kombu_exc.MessageStateError,)) LOG.warning("Unexpected message type: '%s' in message" - " %r", message_type, message.delivery_tag) + " '%s'", message_type, ku.DelayedPretty(message)) else: if isinstance(handler, (tuple, list)): handler, validator = handler @@ -87,15 +88,15 @@ class TypeDispatcher(object): except excp.InvalidFormat as e: message.reject_log_error( logger=LOG, errors=(kombu_exc.MessageStateError,)) - LOG.warn("Message: %r, '%s' was rejected due to it being" + LOG.warn("Message '%s' (%s) was rejected due to it being" " in an invalid format: %s", - message.delivery_tag, message_type, e) + ku.DelayedPretty(message), message_type, e) return message.ack_log_error(logger=LOG, errors=(kombu_exc.MessageStateError,)) if message.acknowledged: - LOG.debug("AMQP message %r acknowledged.", - message.delivery_tag) + LOG.debug("Message '%s' was acknowledged.", + ku.DelayedPretty(message)) handler(data, message) else: message.reject_log_error(logger=LOG, @@ -103,7 +104,7 @@ class TypeDispatcher(object): def on_message(self, data, message): """This method is called on incoming messages.""" - LOG.debug("Got message: %r", message.delivery_tag) + LOG.debug("Received message '%s'", ku.DelayedPretty(message)) if self._collect_requeue_votes(data, message): self._requeue_log_error(message, errors=(kombu_exc.MessageStateError,)) @@ -114,6 +115,6 @@ class TypeDispatcher(object): message.reject_log_error( logger=LOG, errors=(kombu_exc.MessageStateError,)) LOG.warning("The 'type' message property is missing" - " in message %r", message.delivery_tag) + " in message '%s'", ku.DelayedPretty(message)) else: self._process_message(data, message, message_type) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 8290ba616..6ece1b84e 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -26,6 +26,7 @@ from taskflow import exceptions as exc from taskflow import logging from taskflow import task as task_atom from taskflow.types import timing as tt +from taskflow.utils import kombu_utils as ku from taskflow.utils import misc from taskflow.utils import threading_utils as tu @@ -73,7 +74,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): def _process_notify(self, notify, message): """Process notify message from remote side.""" LOG.debug("Started processing notify message '%s'", - message.delivery_tag) + ku.DelayedPretty(message)) + topic = notify['topic'] tasks = notify['tasks'] @@ -91,11 +93,13 @@ class WorkerTaskExecutor(executor.TaskExecutor): def _process_response(self, response, message): """Process response from remote side.""" LOG.debug("Started processing response message '%s'", - message.delivery_tag) + ku.DelayedPretty(message)) try: task_uuid = message.properties['correlation_id'] except KeyError: - LOG.warning("The 'correlation_id' message property is missing") + LOG.warning("The 'correlation_id' message property is" + " missing in message '%s'", + ku.DelayedPretty(message)) else: request = self._requests_cache.get(task_uuid) if request is not None: diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 02639270b..505ead2aa 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -176,7 +176,8 @@ class Proxy(object): LOG.exception('Publishing error: %s', exc) LOG.info('Retry triggering in %s seconds', interval) - LOG.debug("Sending '%s' using routing keys %s", msg, routing_keys) + LOG.debug("Sending '%s' message using routing keys %s", + msg, routing_keys) with kombu.connections[self._conn].acquire(block=True) as conn: with conn.Producer() as producer: ensure_kwargs = self._ensure_options.copy() diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 6e64f1cd1..5bb0b2ab8 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -23,6 +23,7 @@ from taskflow.engines.worker_based import proxy from taskflow import logging from taskflow.types import failure as ft from taskflow.types import notifier as nt +from taskflow.utils import kombu_utils as ku from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -142,13 +143,14 @@ class Server(object): def _process_notify(self, notify, message): """Process notify message and reply back.""" - LOG.debug("Started processing notify message %r", message.delivery_tag) + LOG.debug("Started processing notify message '%s'", + ku.DelayedPretty(message)) try: reply_to = message.properties['reply_to'] except KeyError: LOG.warn("The 'reply_to' message property is missing" - " in received notify message %r", message.delivery_tag, - exc_info=True) + " in received notify message '%s'", + ku.DelayedPretty(message), exc_info=True) else: response = pr.Notify(topic=self._topic, tasks=self._endpoints.keys()) @@ -156,13 +158,13 @@ class Server(object): self._proxy.publish(response, routing_key=reply_to) except Exception: LOG.critical("Failed to send reply to '%s' with notify" - " response %s", reply_to, response, + " response '%s'", reply_to, response, exc_info=True) def _process_request(self, request, message): """Process request message and reply back.""" - LOG.debug("Started processing request message %r", - message.delivery_tag) + LOG.debug("Started processing request message '%s'", + ku.DelayedPretty(message)) try: # NOTE(skudriashev): parse broker message first to get # the `reply_to` and the `task_uuid` parameters to have @@ -170,8 +172,8 @@ class Server(object): # in the first place...). reply_to, task_uuid = self._parse_message(message) except ValueError: - LOG.warn("Failed to parse request attributes from message %r", - message.delivery_tag, exc_info=True) + LOG.warn("Failed to parse request attributes from message '%s'", + ku.DelayedPretty(message), exc_info=True) return else: # prepare reply callback @@ -185,8 +187,8 @@ class Server(object): arguments['task_uuid'] = task_uuid except ValueError: with misc.capture_failure() as failure: - LOG.warn("Failed to parse request contents from message %r", - message.delivery_tag, exc_info=True) + LOG.warn("Failed to parse request contents from message '%s'", + ku.DelayedPretty(message), exc_info=True) reply_callback(result=failure.to_dict()) return @@ -196,8 +198,8 @@ class Server(object): except KeyError: with misc.capture_failure() as failure: LOG.warn("The '%s' task endpoint does not exist, unable" - " to continue processing request message %r", - task_cls, message.delivery_tag, exc_info=True) + " to continue processing request message '%s'", + task_cls, ku.DelayedPretty(message), exc_info=True) reply_callback(result=failure.to_dict()) return else: @@ -207,8 +209,8 @@ class Server(object): with misc.capture_failure() as failure: LOG.warn("The '%s' handler does not exist on task endpoint" " '%s', unable to continue processing request" - " message %r", action, endpoint, - message.delivery_tag, exc_info=True) + " message '%s'", action, endpoint, + ku.DelayedPretty(message), exc_info=True) reply_callback(result=failure.to_dict()) return else: @@ -217,8 +219,8 @@ class Server(object): except Exception: with misc.capture_failure() as failure: LOG.warn("The '%s' task '%s' generation for request" - " message %r failed", endpoint, action, - message.delivery_tag, exc_info=True) + " message '%s' failed", endpoint, action, + ku.DelayedPretty(message), exc_info=True) reply_callback(result=failure.to_dict()) return else: @@ -245,8 +247,8 @@ class Server(object): except Exception: with misc.capture_failure() as failure: LOG.warn("The '%s' endpoint '%s' execution for request" - " message %r failed", endpoint, action, - message.delivery_tag, exc_info=True) + " message '%s' failed", endpoint, action, + ku.DelayedPretty(message), exc_info=True) reply_callback(result=failure.to_dict()) else: if isinstance(result, ft.Failure): diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 7a77e8df7..1fb8aa5c5 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -154,7 +154,7 @@ class TestServer(test.MockTestCase): s = self.server(reset_master_mock=True) s._reply(True, self.reply_to, self.task_uuid) - self.assertEqual(self.master_mock.mock_calls, [ + self.master_mock.assert_has_calls([ mock.call.Response(pr.FAILURE), mock.call.proxy.publish(self.response_inst_mock, self.reply_to, correlation_id=self.task_uuid) @@ -195,7 +195,7 @@ class TestServer(test.MockTestCase): mock.call.proxy.publish(self.response_inst_mock, self.reply_to, correlation_id=self.task_uuid) ] - self.assertEqual(master_mock_calls, self.master_mock.mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) def test_process_request(self): # create server and process request @@ -211,7 +211,7 @@ class TestServer(test.MockTestCase): mock.call.proxy.publish(self.response_inst_mock, self.reply_to, correlation_id=self.task_uuid) ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) @mock.patch("taskflow.engines.worker_based.server.LOG.warn") def test_process_request_parse_message_failure(self, mocked_exception): @@ -219,8 +219,6 @@ class TestServer(test.MockTestCase): request = self.make_request() s = self.server(reset_master_mock=True) s._process_request(request, self.message_mock) - - self.assertEqual(self.master_mock.mock_calls, []) self.assertTrue(mocked_exception.called) @mock.patch.object(failure.Failure, 'from_dict') @@ -245,7 +243,7 @@ class TestServer(test.MockTestCase): self.reply_to, correlation_id=self.task_uuid) ] - self.assertEqual(master_mock_calls, self.master_mock.mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_endpoint_not_found(self, to_mock): @@ -266,7 +264,7 @@ class TestServer(test.MockTestCase): self.reply_to, correlation_id=self.task_uuid) ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_execution_failure(self, to_mock): @@ -288,7 +286,7 @@ class TestServer(test.MockTestCase): self.reply_to, correlation_id=self.task_uuid) ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_task_failure(self, to_mock): @@ -312,7 +310,7 @@ class TestServer(test.MockTestCase): self.reply_to, correlation_id=self.task_uuid) ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) def test_start(self): self.server(reset_master_mock=True).start() @@ -321,7 +319,7 @@ class TestServer(test.MockTestCase): master_mock_calls = [ mock.call.proxy.start() ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) def test_wait(self): server = self.server(reset_master_mock=True) @@ -333,7 +331,7 @@ class TestServer(test.MockTestCase): mock.call.proxy.start(), mock.call.proxy.wait() ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) def test_stop(self): self.server(reset_master_mock=True).stop() @@ -342,4 +340,4 @@ class TestServer(test.MockTestCase): master_mock_calls = [ mock.call.proxy.stop() ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.master_mock.assert_has_calls(master_mock_calls) diff --git a/taskflow/utils/kombu_utils.py b/taskflow/utils/kombu_utils.py new file mode 100644 index 000000000..8ace067bc --- /dev/null +++ b/taskflow/utils/kombu_utils.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Keys extracted from the message properties when formatting... +_MSG_PROPERTIES = tuple([ + 'correlation_id', + 'delivery_info/routing_key', + 'type', +]) + + +class DelayedPretty(object): + """Wraps a message and delays prettifying it until requested.""" + + def __init__(self, message): + self._message = message + self._message_pretty = None + + def __str__(self): + if self._message_pretty is None: + self._message_pretty = _prettify_message(self._message) + return self._message_pretty + + +def _get_deep(properties, *keys): + """Get a final key among a list of keys (each with its own sub-dict).""" + for key in keys: + properties = properties[key] + return properties + + +def _prettify_message(message): + """Kombu doesn't currently have a useful ``__str__()`` or ``__repr__()``. + + This provides something decent(ish) for debugging (or other purposes) so + that messages are more nice and understandable.... + + TODO(harlowja): submit something into kombu to fix/adjust this. + """ + if message.content_type is not None: + properties = { + 'content_type': message.content_type, + } + else: + properties = {} + for name in _MSG_PROPERTIES: + segments = name.split("/") + try: + value = _get_deep(message.properties, *segments) + except (KeyError, ValueError, TypeError): + pass + else: + if value is not None: + properties[segments[-1]] = value + if message.body is not None: + properties['body_length'] = len(message.body) + return "%(delivery_tag)s: %(properties)s" % { + 'delivery_tag': message.delivery_tag, + 'properties': properties, + }