diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index 4026e9c55..5f858c50e 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -150,7 +150,8 @@ class NotificationServer(NotificationServerBase): try: res = self.dispatcher.dispatch(message) except Exception: - LOG.error(_LE('Exception during message handling'), exc_info=True) + LOG.exception(_LE('Exception during message handling.')) + res = notify_dispatcher.NotificationResult.REQUEUE try: if (res == notify_dispatcher.NotificationResult.REQUEUE and @@ -159,7 +160,7 @@ class NotificationServer(NotificationServerBase): else: message.acknowledge() except Exception: - LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) + LOG.exception(_LE("Fail to ack/requeue message.")) class BatchNotificationServer(NotificationServerBase): @@ -169,7 +170,7 @@ class BatchNotificationServer(NotificationServerBase): not_processed_messages = self.dispatcher.dispatch(incoming) except Exception: not_processed_messages = set(incoming) - LOG.error(_LE('Exception during message handling'), exc_info=True) + LOG.exception(_LE('Exception during messages handling.')) for m in incoming: try: if m in not_processed_messages and self._allow_requeue: @@ -177,7 +178,7 @@ class BatchNotificationServer(NotificationServerBase): else: m.acknowledge() except Exception: - LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) + LOG.exception(_LE("Fail to ack/requeue message.")) def get_notification_listener(transport, targets, endpoints, diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index a562ea38d..2fbdda77f 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -122,31 +122,38 @@ class RPCServer(msg_server.MessageHandlingServer): def _process_incoming(self, incoming): message = incoming[0] - message.acknowledge() + try: + message.acknowledge() + except Exception: + LOG.exception(_LE("Can not acknowledge message. Skip processing")) + return + + failure = None try: res = self.dispatcher.dispatch(message) except rpc_dispatcher.ExpectedException as e: LOG.debug(u'Expected exception during message handling (%s)', e.exc_info[1]) - message.reply(failure=e.exc_info) + failure = e.exc_info except Exception as e: # current sys.exc_info() content can be overriden # by another exception raise by a log handler during # LOG.exception(). So keep a copy and delete it later. - exc_info = sys.exc_info() - try: - LOG.exception(_LE('Exception during message handling: %s'), e) - message.reply(failure=exc_info) - finally: + failure = sys.exc_info() + LOG.exception(_LE('Exception during handling message')) + + try: + if failure is None: + message.reply(res) + else: + message.reply(failure=failure) + except Exception: + LOG.exception(_LE("Can not send reply for message")) + finally: # NOTE(dhellmann): Remove circular object reference # between the current stack frame and the traceback in # exc_info. - del exc_info - else: - try: - message.reply(res) - except Exception: - LOG.Exception("Can not send reply for message %s", message) + del failure def get_rpc_server(transport, target, endpoints,