Increase robustness of WBE message and request processing

When a notification request/response can't be processed ensure
we log an error message at the same level as the other function
that sends back responses.

Also adds in a return boolean from the _reply message (which
is used for the X number of replies to a servers task request)
function and use this boolean to determine if the worker should
attempt to perform the final handler call that activates the
desired task.

Change-Id: I7f3914c126f39c56d0d2e3dfe02f3b112391ff43
This commit is contained in:
Joshua Harlow
2014-09-11 17:46:26 -07:00
parent 28b2f8fb1b
commit dc688c18f1
2 changed files with 54 additions and 19 deletions

View File

@@ -57,7 +57,6 @@ class Server(object):
self._proxy = proxy.Proxy(topic, exchange, handlers,
on_wait=None, **kwargs)
self._topic = topic
self._executor = executor
self._endpoints = dict([(endpoint.name, endpoint)
for endpoint in endpoints])
@@ -102,21 +101,34 @@ class Server(object):
prop)
return properties
def _reply(self, reply_to, task_uuid, state=pr.FAILURE, **kwargs):
"""Send reply to the `reply_to` queue."""
def _reply(self, capture, reply_to, task_uuid, state=pr.FAILURE, **kwargs):
"""Send a reply to the `reply_to` queue with the given information.
Can capture failures to publish and if capturing will log associated
critical errors on behalf of the caller, and then returns whether the
publish worked out or did not.
"""
response = pr.Response(state, **kwargs)
published = False
try:
self._proxy.publish(response, reply_to, correlation_id=task_uuid)
published = True
except Exception:
if not capture:
raise
LOG.critical("Failed to send reply to '%s' for task '%s' with"
" response %s", reply_to, task_uuid, response,
exc_info=True)
return published
def _on_update_progress(self, reply_to, task_uuid, task, event_data,
progress):
"""Send task update progress notification."""
self._reply(reply_to, task_uuid, pr.PROGRESS, event_data=event_data,
progress=progress)
# NOTE(harlowja): the executor that will trigger this using the
# task notification/listener mechanism will handle logging if this
# fails, so thats why capture is 'False' is used here.
self._reply(False, reply_to, task_uuid, pr.PROGRESS,
event_data=event_data, progress=progress)
def _process_notify(self, notify, message):
"""Process notify message and reply back."""
@@ -128,10 +140,14 @@ class Server(object):
" in received notify message %r", message.delivery_tag,
exc_info=True)
else:
self._proxy.publish(
msg=pr.Notify(topic=self._topic, tasks=self._endpoints.keys()),
routing_key=reply_to
)
response = pr.Notify(topic=self._topic,
tasks=self._endpoints.keys())
try:
self._proxy.publish(response, routing_key=reply_to)
except Exception:
LOG.critical("Failed to send reply to '%s' with notify"
" response %s", reply_to, response,
exc_info=True)
def _process_request(self, request, message):
"""Process request message and reply back."""
@@ -149,11 +165,11 @@ class Server(object):
return
else:
# prepare task progress callback
progress_callback = functools.partial(
self._on_update_progress, reply_to, task_uuid)
progress_callback = functools.partial(self._on_update_progress,
reply_to, task_uuid)
# prepare reply callback
reply_callback = functools.partial(
self._reply, reply_to, task_uuid)
reply_callback = functools.partial(self._reply, True, reply_to,
task_uuid)
# parse request to get task name, action and action arguments
try:
@@ -178,11 +194,23 @@ class Server(object):
reply_callback(result=failure.to_dict())
return
else:
reply_callback(state=pr.RUNNING)
try:
handler = getattr(endpoint, action)
except AttributeError:
with misc.capture_failure() as failure:
LOG.warn("The '%s' handler does not exist on task endpoint"
" '%s', unable to continue processing request"
" message %r", action, endpoint,
message.delivery_tag, exc_info=True)
reply_callback(result=failure.to_dict())
return
else:
if not reply_callback(state=pr.RUNNING):
return
# perform task action
try:
result = getattr(endpoint, action)(**action_args)
result = handler(**action_args)
except Exception:
with misc.capture_failure() as failure:
LOG.warn("The '%s' endpoint '%s' execution for request"

View File

@@ -151,7 +151,7 @@ class TestServer(test.MockTestCase):
# create server and process request
s = self.server(reset_master_mock=True)
s._reply(self.reply_to, self.task_uuid)
s._reply(True, self.reply_to, self.task_uuid)
self.assertEqual(self.master_mock.mock_calls, [
mock.call.Response(pr.FAILURE),
@@ -160,6 +160,16 @@ class TestServer(test.MockTestCase):
])
self.assertTrue(mocked_exception.called)
def test_on_run_reply_failure(self):
request = self.make_request(task=utils.ProgressingTask(), arguments={})
self.proxy_inst_mock.publish.side_effect = RuntimeError('Woot!')
# create server and process request
s = self.server(reset_master_mock=True)
s._process_request(request, self.message_mock)
self.assertEqual(1, self.proxy_inst_mock.publish.call_count)
def test_on_update_progress(self):
request = self.make_request(task=utils.ProgressingTask(), arguments={})
@@ -270,9 +280,6 @@ class TestServer(test.MockTestCase):
# check calls
master_mock_calls = [
mock.call.Response(pr.RUNNING),
mock.call.proxy.publish(self.response_inst_mock, self.reply_to,
correlation_id=self.task_uuid),
mock.call.Response(pr.FAILURE, result=failure_dict),
mock.call.proxy.publish(self.response_inst_mock,
self.reply_to,