Merge "Add and use a nicer kombu message formatter"
This commit is contained in:
		| @@ -28,6 +28,11 @@ Kazoo | ||||
|  | ||||
| .. automodule:: taskflow.utils.kazoo_utils | ||||
|  | ||||
| Kombu | ||||
| ~~~~~ | ||||
|  | ||||
| .. automodule:: taskflow.utils.kombu_utils | ||||
|  | ||||
| Locks | ||||
| ~~~~~ | ||||
|  | ||||
|   | ||||
| @@ -19,6 +19,7 @@ import six | ||||
|  | ||||
| from taskflow import exceptions as excp | ||||
| from taskflow import logging | ||||
| from taskflow.utils import kombu_utils as ku | ||||
|  | ||||
| LOG = logging.getLogger(__name__) | ||||
|  | ||||
| @@ -70,7 +71,7 @@ class TypeDispatcher(object): | ||||
|             LOG.critical("Couldn't requeue %r, reason:%r", | ||||
|                          message.delivery_tag, exc, exc_info=True) | ||||
|         else: | ||||
|             LOG.debug("AMQP message %r requeued.", message.delivery_tag) | ||||
|             LOG.debug("Message '%s' was requeued.", ku.DelayedPretty(message)) | ||||
|  | ||||
|     def _process_message(self, data, message, message_type): | ||||
|         handler = self._handlers.get(message_type) | ||||
| @@ -78,7 +79,7 @@ class TypeDispatcher(object): | ||||
|             message.reject_log_error(logger=LOG, | ||||
|                                      errors=(kombu_exc.MessageStateError,)) | ||||
|             LOG.warning("Unexpected message type: '%s' in message" | ||||
|                         " %r", message_type, message.delivery_tag) | ||||
|                         " '%s'", message_type, ku.DelayedPretty(message)) | ||||
|         else: | ||||
|             if isinstance(handler, (tuple, list)): | ||||
|                 handler, validator = handler | ||||
| @@ -87,15 +88,15 @@ class TypeDispatcher(object): | ||||
|                 except excp.InvalidFormat as e: | ||||
|                     message.reject_log_error( | ||||
|                         logger=LOG, errors=(kombu_exc.MessageStateError,)) | ||||
|                     LOG.warn("Message: %r, '%s' was rejected due to it being" | ||||
|                     LOG.warn("Message '%s' (%s) was rejected due to it being" | ||||
|                              " in an invalid format: %s", | ||||
|                              message.delivery_tag, message_type, e) | ||||
|                              ku.DelayedPretty(message), message_type, e) | ||||
|                     return | ||||
|             message.ack_log_error(logger=LOG, | ||||
|                                   errors=(kombu_exc.MessageStateError,)) | ||||
|             if message.acknowledged: | ||||
|                 LOG.debug("AMQP message %r acknowledged.", | ||||
|                           message.delivery_tag) | ||||
|                 LOG.debug("Message '%s' was acknowledged.", | ||||
|                           ku.DelayedPretty(message)) | ||||
|                 handler(data, message) | ||||
|             else: | ||||
|                 message.reject_log_error(logger=LOG, | ||||
| @@ -103,7 +104,7 @@ class TypeDispatcher(object): | ||||
|  | ||||
|     def on_message(self, data, message): | ||||
|         """This method is called on incoming messages.""" | ||||
|         LOG.debug("Got message: %r", message.delivery_tag) | ||||
|         LOG.debug("Received message '%s'", ku.DelayedPretty(message)) | ||||
|         if self._collect_requeue_votes(data, message): | ||||
|             self._requeue_log_error(message, | ||||
|                                     errors=(kombu_exc.MessageStateError,)) | ||||
| @@ -114,6 +115,6 @@ class TypeDispatcher(object): | ||||
|                 message.reject_log_error( | ||||
|                     logger=LOG, errors=(kombu_exc.MessageStateError,)) | ||||
|                 LOG.warning("The 'type' message property is missing" | ||||
|                             " in message %r", message.delivery_tag) | ||||
|                             " in message '%s'", ku.DelayedPretty(message)) | ||||
|             else: | ||||
|                 self._process_message(data, message, message_type) | ||||
|   | ||||
| @@ -26,6 +26,7 @@ from taskflow import exceptions as exc | ||||
| from taskflow import logging | ||||
| from taskflow import task as task_atom | ||||
| from taskflow.types import timing as tt | ||||
| from taskflow.utils import kombu_utils as ku | ||||
| from taskflow.utils import misc | ||||
| from taskflow.utils import threading_utils as tu | ||||
|  | ||||
| @@ -73,7 +74,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): | ||||
|     def _process_notify(self, notify, message): | ||||
|         """Process notify message from remote side.""" | ||||
|         LOG.debug("Started processing notify message '%s'", | ||||
|                   message.delivery_tag) | ||||
|                   ku.DelayedPretty(message)) | ||||
|  | ||||
|         topic = notify['topic'] | ||||
|         tasks = notify['tasks'] | ||||
|  | ||||
| @@ -91,11 +93,13 @@ class WorkerTaskExecutor(executor.TaskExecutor): | ||||
|     def _process_response(self, response, message): | ||||
|         """Process response from remote side.""" | ||||
|         LOG.debug("Started processing response message '%s'", | ||||
|                   message.delivery_tag) | ||||
|                   ku.DelayedPretty(message)) | ||||
|         try: | ||||
|             task_uuid = message.properties['correlation_id'] | ||||
|         except KeyError: | ||||
|             LOG.warning("The 'correlation_id' message property is missing") | ||||
|             LOG.warning("The 'correlation_id' message property is" | ||||
|                         " missing in message '%s'", | ||||
|                         ku.DelayedPretty(message)) | ||||
|         else: | ||||
|             request = self._requests_cache.get(task_uuid) | ||||
|             if request is not None: | ||||
|   | ||||
| @@ -176,7 +176,8 @@ class Proxy(object): | ||||
|             LOG.exception('Publishing error: %s', exc) | ||||
|             LOG.info('Retry triggering in %s seconds', interval) | ||||
|  | ||||
|         LOG.debug("Sending '%s' using routing keys %s", msg, routing_keys) | ||||
|         LOG.debug("Sending '%s' message using routing keys %s", | ||||
|                   msg, routing_keys) | ||||
|         with kombu.connections[self._conn].acquire(block=True) as conn: | ||||
|             with conn.Producer() as producer: | ||||
|                 ensure_kwargs = self._ensure_options.copy() | ||||
|   | ||||
| @@ -23,6 +23,7 @@ from taskflow.engines.worker_based import proxy | ||||
| from taskflow import logging | ||||
| from taskflow.types import failure as ft | ||||
| from taskflow.types import notifier as nt | ||||
| from taskflow.utils import kombu_utils as ku | ||||
| from taskflow.utils import misc | ||||
|  | ||||
| LOG = logging.getLogger(__name__) | ||||
| @@ -142,13 +143,14 @@ class Server(object): | ||||
|  | ||||
|     def _process_notify(self, notify, message): | ||||
|         """Process notify message and reply back.""" | ||||
|         LOG.debug("Started processing notify message %r", message.delivery_tag) | ||||
|         LOG.debug("Started processing notify message '%s'", | ||||
|                   ku.DelayedPretty(message)) | ||||
|         try: | ||||
|             reply_to = message.properties['reply_to'] | ||||
|         except KeyError: | ||||
|             LOG.warn("The 'reply_to' message property is missing" | ||||
|                      " in received notify message %r", message.delivery_tag, | ||||
|                      exc_info=True) | ||||
|                      " in received notify message '%s'", | ||||
|                      ku.DelayedPretty(message), exc_info=True) | ||||
|         else: | ||||
|             response = pr.Notify(topic=self._topic, | ||||
|                                  tasks=self._endpoints.keys()) | ||||
| @@ -156,13 +158,13 @@ class Server(object): | ||||
|                 self._proxy.publish(response, routing_key=reply_to) | ||||
|             except Exception: | ||||
|                 LOG.critical("Failed to send reply to '%s' with notify" | ||||
|                              " response %s", reply_to, response, | ||||
|                              " response '%s'", reply_to, response, | ||||
|                              exc_info=True) | ||||
|  | ||||
|     def _process_request(self, request, message): | ||||
|         """Process request message and reply back.""" | ||||
|         LOG.debug("Started processing request message %r", | ||||
|                   message.delivery_tag) | ||||
|         LOG.debug("Started processing request message '%s'", | ||||
|                   ku.DelayedPretty(message)) | ||||
|         try: | ||||
|             # NOTE(skudriashev): parse broker message first to get | ||||
|             # the `reply_to` and the `task_uuid` parameters to have | ||||
| @@ -170,8 +172,8 @@ class Server(object): | ||||
|             # in the first place...). | ||||
|             reply_to, task_uuid = self._parse_message(message) | ||||
|         except ValueError: | ||||
|             LOG.warn("Failed to parse request attributes from message %r", | ||||
|                      message.delivery_tag, exc_info=True) | ||||
|             LOG.warn("Failed to parse request attributes from message '%s'", | ||||
|                      ku.DelayedPretty(message), exc_info=True) | ||||
|             return | ||||
|         else: | ||||
|             # prepare reply callback | ||||
| @@ -185,8 +187,8 @@ class Server(object): | ||||
|             arguments['task_uuid'] = task_uuid | ||||
|         except ValueError: | ||||
|             with misc.capture_failure() as failure: | ||||
|                 LOG.warn("Failed to parse request contents from message %r", | ||||
|                          message.delivery_tag, exc_info=True) | ||||
|                 LOG.warn("Failed to parse request contents from message '%s'", | ||||
|                          ku.DelayedPretty(message), exc_info=True) | ||||
|                 reply_callback(result=failure.to_dict()) | ||||
|                 return | ||||
|  | ||||
| @@ -196,8 +198,8 @@ class Server(object): | ||||
|         except KeyError: | ||||
|             with misc.capture_failure() as failure: | ||||
|                 LOG.warn("The '%s' task endpoint does not exist, unable" | ||||
|                          " to continue processing request message %r", | ||||
|                          task_cls, message.delivery_tag, exc_info=True) | ||||
|                          " to continue processing request message '%s'", | ||||
|                          task_cls, ku.DelayedPretty(message), exc_info=True) | ||||
|                 reply_callback(result=failure.to_dict()) | ||||
|                 return | ||||
|         else: | ||||
| @@ -207,8 +209,8 @@ class Server(object): | ||||
|                 with misc.capture_failure() as failure: | ||||
|                     LOG.warn("The '%s' handler does not exist on task endpoint" | ||||
|                              " '%s', unable to continue processing request" | ||||
|                              " message %r", action, endpoint, | ||||
|                              message.delivery_tag, exc_info=True) | ||||
|                              " message '%s'", action, endpoint, | ||||
|                              ku.DelayedPretty(message), exc_info=True) | ||||
|                     reply_callback(result=failure.to_dict()) | ||||
|                     return | ||||
|             else: | ||||
| @@ -217,8 +219,8 @@ class Server(object): | ||||
|                 except Exception: | ||||
|                     with misc.capture_failure() as failure: | ||||
|                         LOG.warn("The '%s' task '%s' generation for request" | ||||
|                                  " message %r failed", endpoint, action, | ||||
|                                  message.delivery_tag, exc_info=True) | ||||
|                                  " message '%s' failed", endpoint, action, | ||||
|                                  ku.DelayedPretty(message), exc_info=True) | ||||
|                         reply_callback(result=failure.to_dict()) | ||||
|                         return | ||||
|                 else: | ||||
| @@ -245,8 +247,8 @@ class Server(object): | ||||
|         except Exception: | ||||
|             with misc.capture_failure() as failure: | ||||
|                 LOG.warn("The '%s' endpoint '%s' execution for request" | ||||
|                          " message %r failed", endpoint, action, | ||||
|                          message.delivery_tag, exc_info=True) | ||||
|                          " message '%s' failed", endpoint, action, | ||||
|                          ku.DelayedPretty(message), exc_info=True) | ||||
|                 reply_callback(result=failure.to_dict()) | ||||
|         else: | ||||
|             if isinstance(result, ft.Failure): | ||||
|   | ||||
| @@ -154,7 +154,7 @@ class TestServer(test.MockTestCase): | ||||
|         s = self.server(reset_master_mock=True) | ||||
|         s._reply(True, self.reply_to, self.task_uuid) | ||||
|  | ||||
|         self.assertEqual(self.master_mock.mock_calls, [ | ||||
|         self.master_mock.assert_has_calls([ | ||||
|             mock.call.Response(pr.FAILURE), | ||||
|             mock.call.proxy.publish(self.response_inst_mock, self.reply_to, | ||||
|                                     correlation_id=self.task_uuid) | ||||
| @@ -195,7 +195,7 @@ class TestServer(test.MockTestCase): | ||||
|             mock.call.proxy.publish(self.response_inst_mock, self.reply_to, | ||||
|                                     correlation_id=self.task_uuid) | ||||
|         ] | ||||
|         self.assertEqual(master_mock_calls, self.master_mock.mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|  | ||||
|     def test_process_request(self): | ||||
|         # create server and process request | ||||
| @@ -211,7 +211,7 @@ class TestServer(test.MockTestCase): | ||||
|             mock.call.proxy.publish(self.response_inst_mock, self.reply_to, | ||||
|                                     correlation_id=self.task_uuid) | ||||
|         ] | ||||
|         self.assertEqual(self.master_mock.mock_calls, master_mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|  | ||||
|     @mock.patch("taskflow.engines.worker_based.server.LOG.warn") | ||||
|     def test_process_request_parse_message_failure(self, mocked_exception): | ||||
| @@ -219,8 +219,6 @@ class TestServer(test.MockTestCase): | ||||
|         request = self.make_request() | ||||
|         s = self.server(reset_master_mock=True) | ||||
|         s._process_request(request, self.message_mock) | ||||
|  | ||||
|         self.assertEqual(self.master_mock.mock_calls, []) | ||||
|         self.assertTrue(mocked_exception.called) | ||||
|  | ||||
|     @mock.patch.object(failure.Failure, 'from_dict') | ||||
| @@ -245,7 +243,7 @@ class TestServer(test.MockTestCase): | ||||
|                                     self.reply_to, | ||||
|                                     correlation_id=self.task_uuid) | ||||
|         ] | ||||
|         self.assertEqual(master_mock_calls, self.master_mock.mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|  | ||||
|     @mock.patch.object(failure.Failure, 'to_dict') | ||||
|     def test_process_request_endpoint_not_found(self, to_mock): | ||||
| @@ -266,7 +264,7 @@ class TestServer(test.MockTestCase): | ||||
|                                     self.reply_to, | ||||
|                                     correlation_id=self.task_uuid) | ||||
|         ] | ||||
|         self.assertEqual(self.master_mock.mock_calls, master_mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|  | ||||
|     @mock.patch.object(failure.Failure, 'to_dict') | ||||
|     def test_process_request_execution_failure(self, to_mock): | ||||
| @@ -288,7 +286,7 @@ class TestServer(test.MockTestCase): | ||||
|                                     self.reply_to, | ||||
|                                     correlation_id=self.task_uuid) | ||||
|         ] | ||||
|         self.assertEqual(self.master_mock.mock_calls, master_mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|  | ||||
|     @mock.patch.object(failure.Failure, 'to_dict') | ||||
|     def test_process_request_task_failure(self, to_mock): | ||||
| @@ -312,7 +310,7 @@ class TestServer(test.MockTestCase): | ||||
|                                     self.reply_to, | ||||
|                                     correlation_id=self.task_uuid) | ||||
|         ] | ||||
|         self.assertEqual(self.master_mock.mock_calls, master_mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|  | ||||
|     def test_start(self): | ||||
|         self.server(reset_master_mock=True).start() | ||||
| @@ -321,7 +319,7 @@ class TestServer(test.MockTestCase): | ||||
|         master_mock_calls = [ | ||||
|             mock.call.proxy.start() | ||||
|         ] | ||||
|         self.assertEqual(self.master_mock.mock_calls, master_mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|  | ||||
|     def test_wait(self): | ||||
|         server = self.server(reset_master_mock=True) | ||||
| @@ -333,7 +331,7 @@ class TestServer(test.MockTestCase): | ||||
|             mock.call.proxy.start(), | ||||
|             mock.call.proxy.wait() | ||||
|         ] | ||||
|         self.assertEqual(self.master_mock.mock_calls, master_mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|  | ||||
|     def test_stop(self): | ||||
|         self.server(reset_master_mock=True).stop() | ||||
| @@ -342,4 +340,4 @@ class TestServer(test.MockTestCase): | ||||
|         master_mock_calls = [ | ||||
|             mock.call.proxy.stop() | ||||
|         ] | ||||
|         self.assertEqual(self.master_mock.mock_calls, master_mock_calls) | ||||
|         self.master_mock.assert_has_calls(master_mock_calls) | ||||
|   | ||||
							
								
								
									
										73
									
								
								taskflow/utils/kombu_utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										73
									
								
								taskflow/utils/kombu_utils.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,73 @@ | ||||
| # -*- coding: utf-8 -*- | ||||
|  | ||||
| #    Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. | ||||
| # | ||||
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may | ||||
| #    not use this file except in compliance with the License. You may obtain | ||||
| #    a copy of the License at | ||||
| # | ||||
| #         http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| #    Unless required by applicable law or agreed to in writing, software | ||||
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||||
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||||
| #    License for the specific language governing permissions and limitations | ||||
| #    under the License. | ||||
|  | ||||
| # Keys extracted from the message properties when formatting... | ||||
| _MSG_PROPERTIES = tuple([ | ||||
|     'correlation_id', | ||||
|     'delivery_info/routing_key', | ||||
|     'type', | ||||
| ]) | ||||
|  | ||||
|  | ||||
| class DelayedPretty(object): | ||||
|     """Wraps a message and delays prettifying it until requested.""" | ||||
|  | ||||
|     def __init__(self, message): | ||||
|         self._message = message | ||||
|         self._message_pretty = None | ||||
|  | ||||
|     def __str__(self): | ||||
|         if self._message_pretty is None: | ||||
|             self._message_pretty = _prettify_message(self._message) | ||||
|         return self._message_pretty | ||||
|  | ||||
|  | ||||
| def _get_deep(properties, *keys): | ||||
|     """Get a final key among a list of keys (each with its own sub-dict).""" | ||||
|     for key in keys: | ||||
|         properties = properties[key] | ||||
|     return properties | ||||
|  | ||||
|  | ||||
| def _prettify_message(message): | ||||
|     """Kombu doesn't currently have a useful ``__str__()`` or ``__repr__()``. | ||||
|  | ||||
|     This provides something decent(ish) for debugging (or other purposes) so | ||||
|     that messages are more nice and understandable.... | ||||
|  | ||||
|     TODO(harlowja): submit something into kombu to fix/adjust this. | ||||
|     """ | ||||
|     if message.content_type is not None: | ||||
|         properties = { | ||||
|             'content_type': message.content_type, | ||||
|         } | ||||
|     else: | ||||
|         properties = {} | ||||
|     for name in _MSG_PROPERTIES: | ||||
|         segments = name.split("/") | ||||
|         try: | ||||
|             value = _get_deep(message.properties, *segments) | ||||
|         except (KeyError, ValueError, TypeError): | ||||
|             pass | ||||
|         else: | ||||
|             if value is not None: | ||||
|                 properties[segments[-1]] = value | ||||
|     if message.body is not None: | ||||
|         properties['body_length'] = len(message.body) | ||||
|     return "%(delivery_tag)s: %(properties)s" % { | ||||
|         'delivery_tag': message.delivery_tag, | ||||
|         'properties': properties, | ||||
|     } | ||||
		Reference in New Issue
	
	Block a user
	 Jenkins
					Jenkins