Remove logging from serialize_remote_exception
This patch removes log_failure argument from the function serialize_remote_exception and from driver implementations using it (because it is never used and always defaults to True) and prevents error logging in this function (because these errors are already logged by servers while processing incoming messages). Change-Id: Ic01bb11d6c4f018a17f3219cdbd07ef4d30fa434 Closes-Bug: 1580352
This commit is contained in:
parent
b8cafee601
commit
39749c77a8
@ -94,11 +94,10 @@ class ListenTask(controller.Task):
|
||||
class ReplyTask(controller.Task):
|
||||
"""A task that sends 'response' message to 'address'.
|
||||
"""
|
||||
def __init__(self, address, response, log_failure):
|
||||
def __init__(self, address, response):
|
||||
super(ReplyTask, self).__init__()
|
||||
self._address = address
|
||||
self._response = response
|
||||
self._log_failure = log_failure
|
||||
self._wakeup = threading.Event()
|
||||
|
||||
def wait(self):
|
||||
|
@ -50,14 +50,13 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
self.stopwatch = timeutils.StopWatch()
|
||||
self.stopwatch.start()
|
||||
|
||||
def _send_reply(self, conn, reply=None, failure=None, log_failure=True):
|
||||
def _send_reply(self, conn, reply=None, failure=None):
|
||||
if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
|
||||
self.msg_id):
|
||||
return
|
||||
|
||||
if failure:
|
||||
failure = rpc_common.serialize_remote_exception(failure,
|
||||
log_failure)
|
||||
failure = rpc_common.serialize_remote_exception(failure)
|
||||
# NOTE(sileht): ending can be removed in N*, see Listener.wait()
|
||||
# for more detail.
|
||||
msg = {'result': reply, 'failure': failure, 'ending': True,
|
||||
@ -74,7 +73,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
'elapsed': self.stopwatch.elapsed()})
|
||||
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
if not self.msg_id:
|
||||
# NOTE(Alexei_987) not sending reply, if msg_id is empty
|
||||
# because reply should not be expected by caller side
|
||||
@ -96,8 +95,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
try:
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_common.PURPOSE_SEND) as conn:
|
||||
self._send_reply(conn, reply, failure,
|
||||
log_failure=log_failure)
|
||||
self._send_reply(conn, reply, failure)
|
||||
return
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
if timer.check_return() > 0:
|
||||
|
@ -92,7 +92,7 @@ class IncomingMessage(object):
|
||||
class RpcIncomingMessage(IncomingMessage):
|
||||
|
||||
@abc.abstractmethod
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Send a reply or failure back to the client."""
|
||||
|
||||
|
||||
|
@ -162,18 +162,15 @@ class Connection(object):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def serialize_remote_exception(failure_info, log_failure=True):
|
||||
def serialize_remote_exception(failure_info):
|
||||
"""Prepares exception data to be sent over rpc.
|
||||
|
||||
Failure_info should be a sys.exc_info() tuple.
|
||||
|
||||
"""
|
||||
tb = traceback.format_exception(*failure_info)
|
||||
|
||||
failure = failure_info[1]
|
||||
if log_failure:
|
||||
LOG.error(_LE("Returning exception %s to caller"),
|
||||
six.text_type(failure))
|
||||
LOG.error(tb)
|
||||
|
||||
kwargs = {}
|
||||
if hasattr(failure, 'kwargs'):
|
||||
|
@ -98,13 +98,13 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
|
||||
self._reply_to = message.reply_to
|
||||
self._correlation_id = message.id
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Schedule a ReplyTask to send the reply."""
|
||||
if self._reply_to:
|
||||
response = marshal_response(reply=reply, failure=failure)
|
||||
response.correlation_id = self._correlation_id
|
||||
LOG.debug("Replying to %s", self._correlation_id)
|
||||
task = drivertasks.ReplyTask(self._reply_to, response, log_failure)
|
||||
task = drivertasks.ReplyTask(self._reply_to, response)
|
||||
self.listener.driver._ctrl.add_task(task)
|
||||
else:
|
||||
LOG.debug("Ignoring reply as no reply address available")
|
||||
|
@ -30,7 +30,7 @@ class FakeIncomingMessage(base.RpcIncomingMessage):
|
||||
self.requeue_callback = requeue
|
||||
self._reply_q = reply_q
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
if self._reply_q:
|
||||
failure = failure[1] if failure else None
|
||||
self._reply_q.put((reply, failure))
|
||||
|
@ -240,7 +240,7 @@ class OsloKafkaMessage(base.RpcIncomingMessage):
|
||||
def requeue(self):
|
||||
LOG.warning(_LW("requeue is not supported"))
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
LOG.warning(_LW("reply is not supported"))
|
||||
|
||||
|
||||
|
@ -175,13 +175,11 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
|
||||
self.reply_q = properties.reply_to
|
||||
self.msg_id = properties.correlation_id
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Send back reply to the RPC client
|
||||
:param reply: Dictionary, reply. In case of exception should be None
|
||||
:param failure: Tuple, should be a sys.exc_info() tuple.
|
||||
Should be None if RPC request was successfully processed.
|
||||
:param log_failure: Boolean, not used in this implementation.
|
||||
It present here to be compatible with driver API
|
||||
|
||||
:return RpcReplyPikaIncomingMessage, message with reply
|
||||
"""
|
||||
|
@ -18,8 +18,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
class Response(object):
|
||||
|
||||
def __init__(self, id=None, type=None, message_id=None,
|
||||
reply_id=None, reply_body=None,
|
||||
failure=None, log_failure=None):
|
||||
reply_id=None, reply_body=None, failure=None):
|
||||
|
||||
self._id = id
|
||||
self._type = type
|
||||
@ -27,7 +26,6 @@ class Response(object):
|
||||
self._reply_id = reply_id
|
||||
self._reply_body = reply_body
|
||||
self._failure = failure
|
||||
self._log_failure = log_failure
|
||||
|
||||
@property
|
||||
def id_(self):
|
||||
@ -53,18 +51,13 @@ class Response(object):
|
||||
def failure(self):
|
||||
return self._failure
|
||||
|
||||
@property
|
||||
def log_failure(self):
|
||||
return self._log_failure
|
||||
|
||||
def to_dict(self):
|
||||
return {zmq_names.FIELD_ID: self._id,
|
||||
zmq_names.FIELD_TYPE: self._type,
|
||||
zmq_names.FIELD_MSG_ID: self._message_id,
|
||||
zmq_names.FIELD_REPLY_ID: self._reply_id,
|
||||
zmq_names.FIELD_REPLY: self._reply_body,
|
||||
zmq_names.FIELD_FAILURE: self._failure,
|
||||
zmq_names.FIELD_LOG_FAILURE: self._log_failure}
|
||||
zmq_names.FIELD_FAILURE: self._failure}
|
||||
|
||||
def __str__(self):
|
||||
return str(self.to_dict())
|
||||
|
@ -37,7 +37,7 @@ class DealerIncomingMessage(base.RpcIncomingMessage):
|
||||
def __init__(self, context, message):
|
||||
super(DealerIncomingMessage, self).__init__(context, message)
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Reply is not needed for non-call messages"""
|
||||
|
||||
def acknowledge(self):
|
||||
@ -55,16 +55,14 @@ class DealerIncomingRequest(base.RpcIncomingMessage):
|
||||
self.reply_id = reply_id
|
||||
self.message_id = message_id
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
if failure is not None:
|
||||
failure = rpc_common.serialize_remote_exception(failure,
|
||||
log_failure)
|
||||
failure = rpc_common.serialize_remote_exception(failure)
|
||||
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
|
||||
message_id=self.message_id,
|
||||
reply_id=self.reply_id,
|
||||
reply_body=reply,
|
||||
failure=failure,
|
||||
log_failure=log_failure)
|
||||
failure=failure)
|
||||
|
||||
LOG.debug("Replying %s", self.message_id)
|
||||
|
||||
|
@ -31,7 +31,7 @@ class PullIncomingMessage(base.RpcIncomingMessage):
|
||||
def __init__(self, context, message):
|
||||
super(PullIncomingMessage, self).__init__(context, message)
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Reply is not needed for non-call messages."""
|
||||
|
||||
def acknowledge(self):
|
||||
|
@ -37,7 +37,7 @@ class RouterIncomingMessage(base.RpcIncomingMessage):
|
||||
self.msg_id = msg_id
|
||||
self.message = message
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Reply is not needed for non-call messages"""
|
||||
|
||||
def acknowledge(self):
|
||||
|
@ -34,7 +34,7 @@ class SubIncomingMessage(base.RpcIncomingMessage):
|
||||
def __init__(self, context, message):
|
||||
super(SubIncomingMessage, self).__init__(context, message)
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
"""Reply is not needed for non-call messages."""
|
||||
|
||||
def acknowledge(self):
|
||||
|
@ -39,16 +39,14 @@ class ZmqIncomingRequest(base.RpcIncomingMessage):
|
||||
self.received = None
|
||||
self.poller = poller
|
||||
|
||||
def reply(self, reply=None, failure=None, log_failure=True):
|
||||
def reply(self, reply=None, failure=None):
|
||||
if failure is not None:
|
||||
failure = rpc_common.serialize_remote_exception(failure,
|
||||
log_failure)
|
||||
failure = rpc_common.serialize_remote_exception(failure)
|
||||
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
|
||||
message_id=self.request.message_id,
|
||||
reply_id=self.reply_id,
|
||||
reply_body=reply,
|
||||
failure=failure,
|
||||
log_failure=log_failure)
|
||||
failure=failure)
|
||||
|
||||
LOG.debug("Replying %s", (str(self.request.message_id)))
|
||||
|
||||
|
@ -20,7 +20,6 @@ zmq = zmq_async.import_zmq()
|
||||
FIELD_TYPE = 'type'
|
||||
FIELD_FAILURE = 'failure'
|
||||
FIELD_REPLY = 'reply'
|
||||
FIELD_LOG_FAILURE = 'log_failure'
|
||||
FIELD_ID = 'id'
|
||||
FIELD_MSG_ID = 'message_id'
|
||||
FIELD_MSG_TYPE = 'msg_type'
|
||||
|
@ -132,15 +132,14 @@ class RPCServer(msg_server.MessageHandlingServer):
|
||||
try:
|
||||
res = self.dispatcher.dispatch(message)
|
||||
except rpc_dispatcher.ExpectedException as e:
|
||||
LOG.debug(u'Expected exception during message handling (%s)',
|
||||
e.exc_info[1])
|
||||
failure = e.exc_info
|
||||
except Exception as e:
|
||||
LOG.debug(u'Expected exception during message handling (%s)', e)
|
||||
except Exception:
|
||||
# current sys.exc_info() content can be overriden
|
||||
# by another exception raise by a log handler during
|
||||
# by another exception raised by a log handler during
|
||||
# LOG.exception(). So keep a copy and delete it later.
|
||||
failure = sys.exc_info()
|
||||
LOG.exception(_LE('Exception during handling message'))
|
||||
LOG.exception(_LE('Exception during message handling'))
|
||||
|
||||
try:
|
||||
if failure is None:
|
||||
|
@ -451,14 +451,6 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
senders = []
|
||||
replies = []
|
||||
msgs = []
|
||||
errors = []
|
||||
|
||||
def stub_error(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
errors.append(str(msg) % a)
|
||||
|
||||
self.stubs.Set(driver_common.LOG, 'error', stub_error)
|
||||
|
||||
def send_and_wait_for_reply(i):
|
||||
try:
|
||||
@ -500,8 +492,7 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
raise ZeroDivisionError
|
||||
except Exception:
|
||||
failure = sys.exc_info()
|
||||
msgs[i].reply(failure=failure,
|
||||
log_failure=not self.expected)
|
||||
msgs[i].reply(failure=failure)
|
||||
elif self.rx_id:
|
||||
msgs[i].reply({'rx_id': i})
|
||||
else:
|
||||
@ -519,11 +510,6 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
else:
|
||||
self.assertEqual(self.reply, reply)
|
||||
|
||||
if not self.timeout and self.failure and not self.expected:
|
||||
self.assertTrue(len(errors) > 0, errors)
|
||||
else:
|
||||
self.assertEqual(0, len(errors), errors)
|
||||
|
||||
|
||||
TestSendReceive.generate_scenarios()
|
||||
|
||||
|
@ -21,6 +21,7 @@ import testscenarios
|
||||
|
||||
import mock
|
||||
import oslo_messaging
|
||||
from oslo_messaging.rpc import server as rpc_server_module
|
||||
from oslo_messaging import server as server_module
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
|
||||
@ -326,6 +327,22 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
def ping(self, ctxt, arg):
|
||||
raise ValueError(arg)
|
||||
|
||||
debugs = []
|
||||
errors = []
|
||||
|
||||
def stub_debug(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
debugs.append(str(msg) % a)
|
||||
|
||||
def stub_error(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
errors.append(str(msg) % a)
|
||||
|
||||
self.stubs.Set(rpc_server_module.LOG, 'debug', stub_debug)
|
||||
self.stubs.Set(rpc_server_module.LOG, 'error', stub_error)
|
||||
|
||||
server_thread = self._setup_server(transport, TestEndpoint())
|
||||
client = self._setup_client(transport)
|
||||
|
||||
@ -334,6 +351,8 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
except Exception as ex:
|
||||
self.assertIsInstance(ex, ValueError)
|
||||
self.assertEqual('dsfoo', str(ex))
|
||||
self.assertTrue(len(debugs) == 0)
|
||||
self.assertTrue(len(errors) > 0)
|
||||
else:
|
||||
self.assertTrue(False)
|
||||
|
||||
@ -342,6 +361,22 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
def test_expected_failure(self):
|
||||
transport = oslo_messaging.get_transport(self.conf, url='fake:')
|
||||
|
||||
debugs = []
|
||||
errors = []
|
||||
|
||||
def stub_debug(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
debugs.append(str(msg) % a)
|
||||
|
||||
def stub_error(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
errors.append(str(msg) % a)
|
||||
|
||||
self.stubs.Set(rpc_server_module.LOG, 'debug', stub_debug)
|
||||
self.stubs.Set(rpc_server_module.LOG, 'error', stub_error)
|
||||
|
||||
class TestEndpoint(object):
|
||||
@oslo_messaging.expected_exceptions(ValueError)
|
||||
def ping(self, ctxt, arg):
|
||||
@ -355,6 +390,8 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
except Exception as ex:
|
||||
self.assertIsInstance(ex, ValueError)
|
||||
self.assertEqual('dsfoo', str(ex))
|
||||
self.assertTrue(len(debugs) > 0)
|
||||
self.assertTrue(len(errors) == 0)
|
||||
else:
|
||||
self.assertTrue(False)
|
||||
|
||||
|
@ -61,11 +61,6 @@ def add_remote_postfix(ex):
|
||||
|
||||
class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
|
||||
|
||||
_log_failure = [
|
||||
('log_failure', dict(log_failure=True)),
|
||||
('do_not_log_failure', dict(log_failure=False)),
|
||||
]
|
||||
|
||||
_add_remote = [
|
||||
('add_remote', dict(add_remote=True)),
|
||||
('do_not_add_remote', dict(add_remote=False)),
|
||||
@ -100,27 +95,19 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
|
||||
|
||||
@classmethod
|
||||
def generate_scenarios(cls):
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._log_failure,
|
||||
cls._add_remote,
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._add_remote,
|
||||
cls._exception_types)
|
||||
|
||||
def setUp(self):
|
||||
super(SerializeRemoteExceptionTestCase, self).setUp()
|
||||
|
||||
def test_serialize_remote_exception(self):
|
||||
errors = []
|
||||
|
||||
def stub_error(msg, *a, **kw):
|
||||
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
||||
a = a[0]
|
||||
errors.append(str(msg) % a)
|
||||
|
||||
self.stubs.Set(exceptions.LOG, 'error', stub_error)
|
||||
|
||||
try:
|
||||
try:
|
||||
raise self.cls(*self.args, **self.kwargs)
|
||||
except Exception as ex:
|
||||
# Note: in Python 3 ex variable will be cleared at the end of
|
||||
# the except clause, so explicitly make an extra copy of it
|
||||
cls_error = ex
|
||||
if self.add_remote:
|
||||
ex = add_remote_postfix(ex)
|
||||
@ -128,8 +115,7 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
|
||||
except Exception:
|
||||
exc_info = sys.exc_info()
|
||||
|
||||
serialized = exceptions.serialize_remote_exception(
|
||||
exc_info, log_failure=self.log_failure)
|
||||
serialized = exceptions.serialize_remote_exception(exc_info)
|
||||
|
||||
failure = jsonutils.loads(serialized)
|
||||
|
||||
@ -143,11 +129,6 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase):
|
||||
tb = cls_error.__class__.__name__ + ': ' + self.msg
|
||||
self.assertIn(tb, ''.join(failure['tb']))
|
||||
|
||||
if self.log_failure:
|
||||
self.assertTrue(len(errors) > 0, errors)
|
||||
else:
|
||||
self.assertEqual(0, len(errors), errors)
|
||||
|
||||
|
||||
SerializeRemoteExceptionTestCase.generate_scenarios()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user