From 1cf69b9ec647136367d3b6d016375a652d9b269b Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 21 Feb 2014 17:35:13 -0800 Subject: [PATCH] A few worker-engine cleanups - Remove usage of % with logs, just let the underlying LOG do the interpolation for us if the log level passes. - Use LOG.exception instead of LOG.warn and then including the exception as a interpolated variable. Just let the LOG.exception output the exception trace and info. - Capture the failure object before doing other work using a new context manager that can yield back a failure object before further code is called (to avoid a context switch happening in eventlet or elsewhere that would clear the exception state). Change-Id: Ic1a504ba69c56ba226b1a067b5ade4590c245dd8 --- taskflow/engines/worker_based/executor.py | 36 ++++++------- taskflow/engines/worker_based/proxy.py | 14 ++--- taskflow/engines/worker_based/server.py | 53 ++++++++++--------- .../tests/unit/worker_based/test_executor.py | 6 +-- .../tests/unit/worker_based/test_server.py | 14 +++-- taskflow/utils/misc.py | 30 +++++++++++ 6 files changed, 92 insertions(+), 61 deletions(-) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index a3d84a55..5c8c3a66 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -45,7 +45,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): # TODO(skudriashev): This data should be collected from workers # using broadcast messages directly. self._workers_info = {} - for topic, tasks in workers_info.items(): + for topic, tasks in six.iteritems(workers_info): for task in tasks: self._workers_info[task] = topic @@ -59,12 +59,12 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): def _on_message(self, response, message): """This method is called on incoming response.""" - LOG.debug("Got response: %s" % response) + LOG.debug("Got response: %s", response) try: - # acknowledge message + # acknowledge message before processing. message.ack() - except kombu_exc.MessageStateError as e: - LOG.warning("Failed to acknowledge AMQP message: %s" % e) + except kombu_exc.MessageStateError: + LOG.exception("Failed to acknowledge AMQP message") else: LOG.debug("AMQP message acknowledged.") # get task uuid from message correlation id parameter @@ -73,7 +73,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): except KeyError: LOG.warning("Got message with no 'correlation_id' property.") else: - LOG.debug("Task uuid: '%s'" % task_uuid) + LOG.debug("Task uuid: '%s'", task_uuid) self._process_response(task_uuid, response) def _process_response(self, task_uuid, response): @@ -81,7 +81,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): try: task = self._remote_tasks[task_uuid] except KeyError: - LOG.debug("Task with id='%s' not found." % task_uuid) + LOG.debug("Task with id='%s' not found.", task_uuid) else: state = response.pop('state') if state == pr.RUNNING: @@ -89,15 +89,14 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): elif state == pr.PROGRESS: task.on_progress(**response) elif state == pr.FAILURE: - response['result'] = pu.failure_from_dict( - response['result']) + response['result'] = pu.failure_from_dict(response['result']) task.set_result(**response) self._remove_remote_task(task) elif state == pr.SUCCESS: task.set_result(**response) self._remove_remote_task(task) else: - LOG.warning("Unexpected response status: '%s'" % state) + LOG.warning("Unexpected response status: '%s'", state) def _on_wait(self): """This function is called cyclically between draining events @@ -106,9 +105,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): expired_tasks = [task for task in six.itervalues(self._remote_tasks) if task.expired] for task in expired_tasks: - LOG.debug("Task request '%s' is expired." % task) + LOG.debug("Task request '%s' has expired.", task) task.set_result(misc.Failure.from_exception( - exc.Timeout("Task request '%s' is expired" % task))) + exc.Timeout("Task request '%s' has expired" % task))) del self._remote_tasks[task.uuid] def _store_remote_task(self, task): @@ -134,17 +133,18 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): topic = self._workers_info[remote_task.name] except KeyError: raise exc.NotFound("Workers topic not found for the '%s'" - "task." % remote_task.name) + " task" % remote_task.name) else: # publish request request = remote_task.request - LOG.debug("Sending request: %s" % request) + LOG.debug("Sending request: %s", request) self._proxy.publish(request, remote_task.uuid, routing_key=topic, reply_to=self._uuid) - except Exception as e: - LOG.error("Failed to submit the '%s' task: %s" % (remote_task, e)) - self._remove_remote_task(remote_task) - remote_task.set_result(misc.Failure()) + except Exception: + with misc.capture_failure() as failure: + LOG.exception("Failed to submit the '%s' task", remote_task) + self._remove_remote_task(remote_task) + remote_task.set_result(failure) return remote_task.result def execute_task(self, task, task_uuid, arguments, diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 9dfed320..c61ddf34 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -84,7 +84,7 @@ class Proxy(object): def start(self): """Start proxy.""" - LOG.info("Starting to consume from the '%s' exchange." % + LOG.info("Starting to consume from the '%s' exchange.", self._exchange_name) with kombu.connections[self._conn].acquire(block=True) as conn: queue = self._make_queue(self._topic, self._exchange, channel=conn) @@ -104,16 +104,16 @@ class Proxy(object): queue.delete(if_unused=True) except (amqp_exc.PreconditionFailed, amqp_exc.NotFound): pass - except Exception as e: - LOG.error("Failed to delete the '%s' queue: %s" % - (queue.name, e)) + except Exception: + LOG.exception("Failed to delete the '%s' queue", + queue.name) try: self._exchange.delete(if_unused=True) except (amqp_exc.PreconditionFailed, amqp_exc.NotFound): pass - except Exception as e: - LOG.error("Failed to delete the '%s' exchange: %s" % - (self._exchange.name, e)) + except Exception: + LOG.exception("Failed to delete the '%s' exchange", + self._exchange.name) def wait(self): """Wait until proxy is started.""" diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 7b637a56..a554dcde 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -38,7 +38,7 @@ class Server(object): def _on_message(self, request, message): """This method is called on incoming request.""" - LOG.debug("Got request: %s" % request) + LOG.debug("Got request: %s", request) # NOTE(skudriashev): Process all incoming requests only if proxy is # running, otherwise reject and requeue them. if self._proxy.is_running: @@ -47,20 +47,20 @@ class Server(object): try: # acknowledge message message.ack() - except kombu_exc.MessageStateError as e: - LOG.warning("Failed to acknowledge AMQP message: %s" % e) + except kombu_exc.MessageStateError: + LOG.exception("Failed to acknowledge AMQP message") else: - LOG.debug("AMQP message acknowledged.") + LOG.debug("AMQP message acknowledged") # spawn new thread to process request self._executor.submit(self._process_request, request, message) else: try: # reject and requeue message message.reject(requeue=True) - except kombu_exc.MessageStateError as e: - LOG.warning("Failed to reject/requeue AMQP message: %s" % e) + except kombu_exc.MessageStateError: + LOG.exception("Failed to reject/requeue AMQP message") else: - LOG.debug("AMQP message rejected and requeued.") + LOG.debug("AMQP message rejected and requeued") @staticmethod def _parse_request(task, task_name, action, arguments, result=None, @@ -93,7 +93,7 @@ class Server(object): try: properties.append(message.properties[prop]) except KeyError: - raise ValueError("The '%s' message property is missing." % + raise ValueError("The '%s' message property is missing" % prop) return properties @@ -101,11 +101,11 @@ class Server(object): def _reply(self, reply_to, task_uuid, state=pr.FAILURE, **kwargs): """Send reply to the `reply_to` queue.""" response = dict(state=state, **kwargs) - LOG.debug("Send reply: %s" % response) + LOG.debug("Sending reply: %s", response) try: self._proxy.publish(response, task_uuid, reply_to) - except Exception as e: - LOG.error("Failed to send reply: %s" % e) + except Exception: + LOG.exception("Failed to send reply") def _on_update_progress(self, reply_to, task_uuid, task, event_data, progress): @@ -115,12 +115,12 @@ class Server(object): def _process_request(self, request, message): """Process request in separate thread and reply back.""" - # parse broker message first to get the `reply_to` and the `task_uuid` - # parameters to have possibility to reply back + # NOTE(skudriashev): parse broker message first to get the `reply_to` + # and the `task_uuid` parameters to have possibility to reply back. try: reply_to, task_uuid = self._parse_message(message) - except ValueError as e: - LOG.error("Failed to parse broker message: %s" % e) + except ValueError: + LOG.exception("Failed to parse broker message") return else: # prepare task progress callback @@ -135,27 +135,30 @@ class Server(object): task, action, action_args = self._parse_request(**request) action_args.update(task_uuid=task_uuid, progress_callback=progress_callback) - except ValueError as e: - LOG.error("Failed to parse request: %s" % e) - reply_callback(result=pu.failure_to_dict(misc.Failure())) - return + except ValueError: + with misc.capture_failure() as failure: + LOG.exception("Failed to parse request") + reply_callback(result=pu.failure_to_dict(failure)) + return # get task endpoint try: endpoint = self._endpoints[task] except KeyError: - LOG.error("The '%s' task endpoint does not exist." % task) - reply_callback(result=pu.failure_to_dict(misc.Failure())) - return + with misc.capture_failure() as failure: + LOG.exception("The '%s' task endpoint does not exist", task) + reply_callback(result=pu.failure_to_dict(failure)) + return else: reply_callback(state=pr.RUNNING) # perform task action try: result = getattr(endpoint, action)(**action_args) - except Exception as e: - LOG.error("The %s task execution failed: %s" % (endpoint, e)) - reply_callback(result=pu.failure_to_dict(misc.Failure())) + except Exception: + with misc.capture_failure() as failure: + LOG.exception("The %s task execution failed", endpoint) + reply_callback(result=pu.failure_to_dict(failure)) else: if isinstance(result, misc.Failure): reply_callback(result=pu.failure_to_dict(result)) diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index fbb61590..25449846 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -177,11 +177,11 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(self.remote_task_mock.mock_calls, []) self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()]) - @mock.patch('taskflow.engines.worker_based.executor.LOG.warning') - def test_on_message_acknowledge_raises(self, mocked_warning): + @mock.patch('taskflow.engines.worker_based.executor.LOG.exception') + def test_on_message_acknowledge_raises(self, mocked_exception): self.message_mock.ack.side_effect = kombu_exc.MessageStateError() self.executor()._on_message({}, self.message_mock) - self.assertTrue(mocked_warning.called) + self.assertTrue(mocked_exception.called) @mock.patch('taskflow.engines.worker_based.remote_task.misc.wallclock') def test_on_wait_task_not_expired(self, mock_time): diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 4a5326e6..34af585a 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -210,8 +210,8 @@ class TestServer(test.MockTestCase): failures=dict((str(i), utils.FailureMatcher(f)) for i, f in enumerate(failures))))) - @mock.patch("taskflow.engines.worker_based.server.LOG.error") - def test_reply_publish_failure(self, mocked_error): + @mock.patch("taskflow.engines.worker_based.server.LOG.exception") + def test_reply_publish_failure(self, mocked_exception): self.proxy_inst_mock.publish.side_effect = RuntimeError('Woot!') # create server and process request @@ -222,9 +222,7 @@ class TestServer(test.MockTestCase): mock.call.proxy.publish({'state': 'FAILURE'}, self.task_uuid, self.reply_to) ]) - self.assertEqual(mocked_error.mock_calls, [ - mock.call("Failed to send reply: Woot!") - ]) + self.assertTrue(mocked_exception.called) def test_on_update_progress(self): request = self.request(task='taskflow.tests.utils.ProgressingTask', @@ -261,15 +259,15 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch("taskflow.engines.worker_based.server.LOG.error") - def test_process_request_parse_message_failure(self, mocked_error): + @mock.patch("taskflow.engines.worker_based.server.LOG.exception") + def test_process_request_parse_message_failure(self, mocked_exception): self.message_mock.properties = {} request = self.request() s = self.server(reset_master_mock=True) s._process_request(request, self.message_mock) self.assertEqual(self.master_mock.mock_calls, []) - self.assertTrue(mocked_error.called) + self.assertTrue(mocked_exception.called) @mock.patch('taskflow.engines.worker_based.server.pu') def test_process_request_parse_failure(self, pu_mock): diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index dd22e1e0..7d6a5f66 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -16,6 +16,7 @@ # under the License. import collections +import contextlib import copy import errno import functools @@ -452,6 +453,35 @@ def are_equal_exc_info_tuples(ei1, ei2): return tb1 == tb2 +@contextlib.contextmanager +def capture_failure(): + """Save current exception, and yield back the failure (or raises a + runtime error if no active exception is being handled). + + In some cases the exception context can be cleared, resulting in None + being attempted to be saved after an exception handler is run. This + can happen when eventlet switches greenthreads or when running an + exception handler, code raises and catches an exception. In both + cases the exception context will be cleared. + + To work around this, we save the exception state, yield a failure and + then run other code. + + For example:: + + except Exception: + with capture_failure() as fail: + LOG.warn("Activating cleanup") + cleanup() + save_failure(fail) + """ + exc_info = sys.exc_info() + if not any(exc_info): + raise RuntimeError("No active exception is being handled") + else: + yield Failure(exc_info=exc_info) + + class Failure(object): """Object that represents failure.