Handle cases where exc_args can't be serialized as JSON in the WBE
First try to get them, but if they aren't able to be serialized, just return the failure without them. This only affects the WBE that serializes failures as part of the JSON response that is sent over the wire from the worker process to the WBE engine. Change-Id: I86e3255b612bc15097aabe63a684cf8b8808e61b
This commit is contained in:
@@ -20,6 +20,7 @@ import threading
|
||||
|
||||
import fasteners
|
||||
import futurist
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import reflection
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
@@ -100,6 +101,17 @@ RESPONSE = 'RESPONSE'
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def failure_to_dict(failure):
|
||||
failure_dict = failure.to_dict()
|
||||
try:
|
||||
# it's possible the exc_args can't be serialized as JSON
|
||||
# if that's the case, just get the failure without them
|
||||
jsonutils.dumps(failure_dict)
|
||||
return failure_dict
|
||||
except (TypeError, ValueError):
|
||||
return failure.to_dict(include_args=False)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Message(object):
|
||||
"""Base class for all message types."""
|
||||
@@ -301,6 +313,7 @@ class Request(Message):
|
||||
convert all `failure.Failure` objects into dictionaries (which will
|
||||
then be reconstituted by the receiver).
|
||||
"""
|
||||
|
||||
request = {
|
||||
'task_cls': reflection.get_class_name(self._task),
|
||||
'task_name': self._task.name,
|
||||
@@ -311,14 +324,14 @@ class Request(Message):
|
||||
if 'result' in self._kwargs:
|
||||
result = self._kwargs['result']
|
||||
if isinstance(result, ft.Failure):
|
||||
request['result'] = ('failure', result.to_dict())
|
||||
request['result'] = ('failure', failure_to_dict(result))
|
||||
else:
|
||||
request['result'] = ('success', result)
|
||||
if 'failures' in self._kwargs:
|
||||
failures = self._kwargs['failures']
|
||||
request['failures'] = {}
|
||||
for task, failure in six.iteritems(failures):
|
||||
request['failures'][task] = failure.to_dict()
|
||||
request['failures'][task] = failure_to_dict(failure)
|
||||
return request
|
||||
|
||||
def set_result(self, result):
|
||||
|
||||
@@ -179,7 +179,7 @@ class Server(object):
|
||||
with misc.capture_failure() as failure:
|
||||
LOG.warn("Failed to parse request contents from message '%s'",
|
||||
ku.DelayedPretty(message), exc_info=True)
|
||||
reply_callback(result=failure.to_dict())
|
||||
reply_callback(result=pr.failure_to_dict(failure))
|
||||
return
|
||||
|
||||
# Now fetch the task endpoint (and action handler on it).
|
||||
@@ -191,7 +191,7 @@ class Server(object):
|
||||
" to continue processing request message '%s'",
|
||||
work.task_cls, ku.DelayedPretty(message),
|
||||
exc_info=True)
|
||||
reply_callback(result=failure.to_dict())
|
||||
reply_callback(result=pr.failure_to_dict(failure))
|
||||
return
|
||||
else:
|
||||
try:
|
||||
@@ -202,7 +202,7 @@ class Server(object):
|
||||
" '%s', unable to continue processing request"
|
||||
" message '%s'", work.action, endpoint,
|
||||
ku.DelayedPretty(message), exc_info=True)
|
||||
reply_callback(result=failure.to_dict())
|
||||
reply_callback(result=pr.failure_to_dict(failure))
|
||||
return
|
||||
else:
|
||||
try:
|
||||
@@ -212,7 +212,7 @@ class Server(object):
|
||||
LOG.warn("The '%s' task '%s' generation for request"
|
||||
" message '%s' failed", endpoint, work.action,
|
||||
ku.DelayedPretty(message), exc_info=True)
|
||||
reply_callback(result=failure.to_dict())
|
||||
reply_callback(result=pr.failure_to_dict(failure))
|
||||
return
|
||||
else:
|
||||
if not reply_callback(state=pr.RUNNING):
|
||||
@@ -240,7 +240,7 @@ class Server(object):
|
||||
LOG.warn("The '%s' endpoint '%s' execution for request"
|
||||
" message '%s' failed", endpoint, work.action,
|
||||
ku.DelayedPretty(message), exc_info=True)
|
||||
reply_callback(result=failure.to_dict())
|
||||
reply_callback(result=pr.failure_to_dict(failure))
|
||||
else:
|
||||
# And be done with it!
|
||||
if isinstance(result, ft.Failure):
|
||||
|
||||
@@ -283,6 +283,16 @@ class FailureObjectTestCase(test.TestCase):
|
||||
text = captured.pformat(traceback=True)
|
||||
self.assertIn("Traceback (most recent call last):", text)
|
||||
|
||||
def test_no_capture_exc_args(self):
|
||||
captured = _captured_failure(Exception("I am not valid JSON"))
|
||||
fail_obj = failure.Failure(exception_str=captured.exception_str,
|
||||
traceback_str=captured.traceback_str,
|
||||
exc_type_names=list(captured),
|
||||
exc_args=list(captured.exception_args))
|
||||
fail_json = fail_obj.to_dict(include_args=False)
|
||||
self.assertNotEqual(fail_obj.exception_args, fail_json['exc_args'])
|
||||
self.assertEqual(fail_json['exc_args'], tuple())
|
||||
|
||||
|
||||
class WrappedFailureTestCase(test.TestCase):
|
||||
|
||||
|
||||
@@ -162,6 +162,14 @@ class TestProtocol(test.TestCase):
|
||||
failures={self.task.name: a_failure.to_dict()})
|
||||
self.assertEqual(expected, request.to_dict())
|
||||
|
||||
def test_to_dict_with_invalid_json_failures(self):
|
||||
exc = RuntimeError(Exception("I am not valid JSON"))
|
||||
a_failure = failure.Failure.from_exception(exc)
|
||||
request = self.request(failures={self.task.name: a_failure})
|
||||
expected = self.request_to_dict(
|
||||
failures={self.task.name: a_failure.to_dict(include_args=False)})
|
||||
self.assertEqual(expected, request.to_dict())
|
||||
|
||||
@mock.patch('oslo_utils.timeutils.now')
|
||||
def test_pending_not_expired(self, now):
|
||||
now.return_value = 0
|
||||
|
||||
@@ -499,14 +499,18 @@ class Failure(mixins.StrMixin):
|
||||
data['causes'] = tuple(cls.from_dict(d) for d in causes)
|
||||
return cls(**data)
|
||||
|
||||
def to_dict(self):
|
||||
"""Converts this object to a dictionary."""
|
||||
def to_dict(self, include_args=True):
|
||||
"""Converts this object to a dictionary.
|
||||
|
||||
:param include_args: boolean indicating whether to include the
|
||||
exception args in the output.
|
||||
"""
|
||||
return {
|
||||
'exception_str': self.exception_str,
|
||||
'traceback_str': self.traceback_str,
|
||||
'exc_type_names': list(self),
|
||||
'version': self.DICT_VERSION,
|
||||
'exc_args': self.exception_args,
|
||||
'exc_args': self.exception_args if include_args else tuple(),
|
||||
'causes': [f.to_dict() for f in self.causes],
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user