[AMQP 1.0] Avoid unnecessary thread switch on ack

Avoid blocking the RPC Server thread when it attempts to acknowledge
the RPC request message.  Blocking is unnecessary as the acknowledge
command does not return a status and can be processed asynchronously
from the server thread.

Avoiding this context switch improves overall RPC call throughput
according to the simulator tool by approximately ten percent on my
test systems.

Change-Id: I71548eb6f9f7dcaf74cb426d4e9b369b54856419
This commit is contained in:
Kenneth Giusti 2016-10-11 10:06:37 -04:00
parent 368e3cfb47
commit bc46e64711
2 changed files with 7 additions and 11 deletions

View File

@ -235,17 +235,19 @@ class MessageDispositionTask(Task):
super(MessageDispositionTask, self).__init__()
self._disposition = disposition
self._released = released
self._wakeup = threading.Event()
def wait(self):
self._wakeup.wait()
# disposition update does not have to block the sender since there is
# no result to pend for. This avoids a thread context switch with
# every RPC call
pass
def _execute(self, controller):
try:
self._disposition(self._released)
except Exception:
pass
self._wakeup.set()
except Exception as e:
# there's really nothing we can do about a failed disposition.
LOG.exception(_LE("Message acknowledgment failed: %s"), e)
class Sender(pyngus.SenderEventHandler):

View File

@ -124,18 +124,12 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
task = controller.MessageDispositionTask(self._disposition,
released=False)
self.listener.driver._ctrl.add_task(task)
rc = task.wait()
if rc:
LOG.debug("Message acknowledge failed: %s", str(rc))
def requeue(self):
"""Schedule a MessageDispositionTask to release the message"""
task = controller.MessageDispositionTask(self._disposition,
released=True)
self.listener.driver._ctrl.add_task(task)
rc = task.wait()
if rc:
LOG.debug("Message requeue failed: %s", str(rc))
class Queue(object):