From dc688c18f13a9527ffce4d0175aa6a61450278af Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 11 Sep 2014 17:46:26 -0700 Subject: [PATCH] Increase robustness of WBE message and request processing When a notification request/response can't be processed ensure we log an error message at the same level as the other function that sends back responses. Also adds in a return boolean from the _reply message (which is used for the X number of replies to a servers task request) function and use this boolean to determine if the worker should attempt to perform the final handler call that activates the desired task. Change-Id: I7f3914c126f39c56d0d2e3dfe02f3b112391ff43 --- taskflow/engines/worker_based/server.py | 58 ++++++++++++++----- .../tests/unit/worker_based/test_server.py | 15 +++-- 2 files changed, 54 insertions(+), 19 deletions(-) diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index f1a6b703..9440c96a 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -57,7 +57,6 @@ class Server(object): self._proxy = proxy.Proxy(topic, exchange, handlers, on_wait=None, **kwargs) self._topic = topic - self._executor = executor self._endpoints = dict([(endpoint.name, endpoint) for endpoint in endpoints]) @@ -102,21 +101,34 @@ class Server(object): prop) return properties - def _reply(self, reply_to, task_uuid, state=pr.FAILURE, **kwargs): - """Send reply to the `reply_to` queue.""" + def _reply(self, capture, reply_to, task_uuid, state=pr.FAILURE, **kwargs): + """Send a reply to the `reply_to` queue with the given information. + + Can capture failures to publish and if capturing will log associated + critical errors on behalf of the caller, and then returns whether the + publish worked out or did not. + """ response = pr.Response(state, **kwargs) + published = False try: self._proxy.publish(response, reply_to, correlation_id=task_uuid) + published = True except Exception: + if not capture: + raise LOG.critical("Failed to send reply to '%s' for task '%s' with" " response %s", reply_to, task_uuid, response, exc_info=True) + return published def _on_update_progress(self, reply_to, task_uuid, task, event_data, progress): """Send task update progress notification.""" - self._reply(reply_to, task_uuid, pr.PROGRESS, event_data=event_data, - progress=progress) + # NOTE(harlowja): the executor that will trigger this using the + # task notification/listener mechanism will handle logging if this + # fails, so thats why capture is 'False' is used here. + self._reply(False, reply_to, task_uuid, pr.PROGRESS, + event_data=event_data, progress=progress) def _process_notify(self, notify, message): """Process notify message and reply back.""" @@ -128,10 +140,14 @@ class Server(object): " in received notify message %r", message.delivery_tag, exc_info=True) else: - self._proxy.publish( - msg=pr.Notify(topic=self._topic, tasks=self._endpoints.keys()), - routing_key=reply_to - ) + response = pr.Notify(topic=self._topic, + tasks=self._endpoints.keys()) + try: + self._proxy.publish(response, routing_key=reply_to) + except Exception: + LOG.critical("Failed to send reply to '%s' with notify" + " response %s", reply_to, response, + exc_info=True) def _process_request(self, request, message): """Process request message and reply back.""" @@ -149,11 +165,11 @@ class Server(object): return else: # prepare task progress callback - progress_callback = functools.partial( - self._on_update_progress, reply_to, task_uuid) + progress_callback = functools.partial(self._on_update_progress, + reply_to, task_uuid) # prepare reply callback - reply_callback = functools.partial( - self._reply, reply_to, task_uuid) + reply_callback = functools.partial(self._reply, True, reply_to, + task_uuid) # parse request to get task name, action and action arguments try: @@ -178,11 +194,23 @@ class Server(object): reply_callback(result=failure.to_dict()) return else: - reply_callback(state=pr.RUNNING) + try: + handler = getattr(endpoint, action) + except AttributeError: + with misc.capture_failure() as failure: + LOG.warn("The '%s' handler does not exist on task endpoint" + " '%s', unable to continue processing request" + " message %r", action, endpoint, + message.delivery_tag, exc_info=True) + reply_callback(result=failure.to_dict()) + return + else: + if not reply_callback(state=pr.RUNNING): + return # perform task action try: - result = getattr(endpoint, action)(**action_args) + result = handler(**action_args) except Exception: with misc.capture_failure() as failure: LOG.warn("The '%s' endpoint '%s' execution for request" diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 7544b5c6..36d18f4f 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -151,7 +151,7 @@ class TestServer(test.MockTestCase): # create server and process request s = self.server(reset_master_mock=True) - s._reply(self.reply_to, self.task_uuid) + s._reply(True, self.reply_to, self.task_uuid) self.assertEqual(self.master_mock.mock_calls, [ mock.call.Response(pr.FAILURE), @@ -160,6 +160,16 @@ class TestServer(test.MockTestCase): ]) self.assertTrue(mocked_exception.called) + def test_on_run_reply_failure(self): + request = self.make_request(task=utils.ProgressingTask(), arguments={}) + self.proxy_inst_mock.publish.side_effect = RuntimeError('Woot!') + + # create server and process request + s = self.server(reset_master_mock=True) + s._process_request(request, self.message_mock) + + self.assertEqual(1, self.proxy_inst_mock.publish.call_count) + def test_on_update_progress(self): request = self.make_request(task=utils.ProgressingTask(), arguments={}) @@ -270,9 +280,6 @@ class TestServer(test.MockTestCase): # check calls master_mock_calls = [ - mock.call.Response(pr.RUNNING), - mock.call.proxy.publish(self.response_inst_mock, self.reply_to, - correlation_id=self.task_uuid), mock.call.Response(pr.FAILURE, result=failure_dict), mock.call.proxy.publish(self.response_inst_mock, self.reply_to,