Issue blocking ACK for RPC requests from the consumer thread

The patch for https://review.openstack.org/#/c/436958/ fixed a
threading problem by moving the ack back to the polling
thread. However the RPC server expects to catch any failures of the
ACK and abort the request.  This patch adds the ACK error handling
back to the polling thread.

This patch is based heavily off the original work done by Mehdi
Abaakouk (sileht).

Change-Id: I708c3d6676b974d8daac6817c15f596cdf35817b
Closes-Bug: #1695746
This commit is contained in:
Kenneth Giusti 2018-07-10 12:03:03 -04:00
parent b97481ce19
commit 26b0be585a
2 changed files with 74 additions and 10 deletions

View File

@ -167,8 +167,34 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
'duration': duration})
return
def heartbeat(self):
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
self._send_reply(conn, None, None, ending=False)
# NOTE(sileht): Those have already be ack in RpcListener IO thread
# We keep them as noop until all drivers do the same
def acknowledge(self):
self._message_operations_handler.do(self.message.acknowledge)
pass
def requeue(self):
pass
class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
def acknowledge(self):
def _do_ack():
try:
self.message.acknowledge()
except Exception as exc:
# NOTE(kgiusti): this failure is likely due to a loss of the
# connection to the broker. Not much we can do in this case,
# especially considering the Notification has already been
# dispatched. This *could* result in message duplication
# (unacked msg is returned to the queue by the broker), but the
# driver tries to catch that using the msg_id_cache.
LOG.warning("Failed to acknowledge received message: %s", exc)
self._message_operations_handler.do(_do_ack)
self.listener.msg_id_cache.add(self.unique_id)
def requeue(self):
@ -178,12 +204,12 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
self._message_operations_handler.do(self.message.requeue)
def heartbeat(self):
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
self._send_reply(conn, None, None, ending=False)
def _do_requeue():
try:
self.message.requeue()
except Exception as exc:
LOG.warning("Failed to requeue received message: %s", exc)
self._message_operations_handler.do(_do_requeue)
class ObsoleteReplyQueuesCache(object):
@ -256,7 +282,7 @@ class AMQPListener(base.PollStyleListener):
else:
LOG.debug("received message with unique_id: %s", unique_id)
self.incoming.append(AMQPIncomingMessage(
self.incoming.append(self.message_cls(
self,
ctxt.to_dict(),
message,
@ -319,6 +345,41 @@ class AMQPListener(base.PollStyleListener):
self.conn.close()
class RpcAMQPListener(AMQPListener):
message_cls = AMQPIncomingMessage
def __call__(self, message):
# NOTE(kgiusti): In the original RPC implementation the RPC server
# would acknowledge the request THEN process it. The goal of this was
# to prevent duplication if the ack failed. Should the ack fail the
# request would be discarded since the broker would not remove the
# request from the queue since no ack was received. That would lead to
# the request being redelivered at some point. However this approach
# meant that the ack was issued from the dispatch thread, not the
# consumer thread, which is bad since kombu is not thread safe. So a
# change was made to schedule the ack to be sent on the consumer thread
# - breaking the ability to catch ack errors before dispatching the
# request. To fix this we do the actual ack here in the consumer
# callback and avoid the upcall if the ack fails. See
# https://bugs.launchpad.net/oslo.messaging/+bug/1695746
# for all the gory details...
try:
message.acknowledge()
except Exception as exc:
LOG.warning("Discarding RPC request due to failed acknowlege: %s",
exc)
else:
# NOTE(kgiusti): be aware that even if the acknowledge call
# succeeds there is no guarantee the broker actually gets the ACK
# since acknowledge() simply writes the ACK to the socket (there is
# no ACK confirmation coming back from the broker)
super(RpcAMQPListener, self).__call__(message)
class NotificationAMQPListener(AMQPListener):
message_cls = NotificationAMQPIncomingMessage
class ReplyWaiters(object):
WAKE_UP = object()
@ -590,7 +651,7 @@ class AMQPDriverBase(base.BaseDriver):
def listen(self, target, batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
listener = RpcAMQPListener(self, conn)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
@ -608,7 +669,7 @@ class AMQPDriverBase(base.BaseDriver):
batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
listener = NotificationAMQPListener(self, conn)
for target, priority in targets_and_priorities:
conn.declare_topic_consumer(
exchange_name=self._get_exchange(target),

View File

@ -152,6 +152,9 @@ class RPCServer(msg_server.MessageHandlingServer):
def _process_incoming(self, incoming):
message = incoming[0]
# TODO(sileht): We should remove that at some point and do
# this directly in the driver
try:
message.acknowledge()
except Exception: