rabbitmq: don't wait for message ack/requeue
I don't see any obvious reason why we should wait ack/requeue is done. This waiter have already be removed from amqp1. https://git.openstack.org/cgit/openstack/oslo.messaging/tree/oslo_messaging/_drivers/amqp1_driver/controller.py#n242 So, this change remove it from rabbitmq driver too. Closes-bug: #1734788 Change-Id: I5ecedc762596181be19410b863851a0054fd6579
This commit is contained in:
parent
7c65038e79
commit
c38857e110
@ -56,13 +56,6 @@ class MessageOperationsHandler(object):
|
||||
target=self._process_in_background)
|
||||
self._shutdown_thread.daemon = True
|
||||
|
||||
# HACK(sileht): this is set by the server.Server temporary
|
||||
# to not have to rewrite the entire internal API to pass
|
||||
# executor everywhere to make Listener aware of the server
|
||||
# executor. All this hack is only for the blocking executor.
|
||||
# And it's deprecated so...
|
||||
self._executor = None
|
||||
|
||||
def stop(self):
|
||||
self._shutdown.set()
|
||||
|
||||
@ -82,26 +75,14 @@ class MessageOperationsHandler(object):
|
||||
|
||||
while True:
|
||||
try:
|
||||
task, event = self._tasks.get(block=False)
|
||||
task = self._tasks.get(block=False)
|
||||
except moves.queue.Empty:
|
||||
break
|
||||
try:
|
||||
task()
|
||||
finally:
|
||||
event.set()
|
||||
task()
|
||||
|
||||
def do(self, task):
|
||||
"Put the task in the queue and waits until the task is completed."
|
||||
if self._executor is None:
|
||||
raise RuntimeError("Unexpected error, no executor is setuped")
|
||||
elif self._executor == "blocking":
|
||||
# NOTE(sileht): Blocking will hang forever if we waiting the
|
||||
# polling thread
|
||||
task()
|
||||
else:
|
||||
event = threading.Event()
|
||||
self._tasks.put((task, event))
|
||||
event.wait()
|
||||
"Put the task in the queue."
|
||||
self._tasks.put(task)
|
||||
|
||||
|
||||
class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
|
@ -417,18 +417,6 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
||||
except driver_base.TransportDriverError as ex:
|
||||
raise ServerListenError(self.target, ex)
|
||||
|
||||
# HACK(sileht): We temporary pass the executor to the rabbit
|
||||
# listener to fix a race with the deprecated blocking executor.
|
||||
# We do this hack because this is need only for 'synchronous'
|
||||
# executor like blocking. And this one is deprecated. Making
|
||||
# driver working in an sync and an async way is complicated
|
||||
# and blocking have 0% tests coverage.
|
||||
if hasattr(self.listener, '_poll_style_listener'):
|
||||
l = self.listener._poll_style_listener
|
||||
if hasattr(l, "_message_operations_handler"):
|
||||
l._message_operations_handler._executor = (
|
||||
self.executor_type)
|
||||
|
||||
self.listener.start(self._on_incoming)
|
||||
|
||||
@ordered(after='start')
|
||||
|
@ -0,0 +1,12 @@
|
||||
---
|
||||
other:
|
||||
- |
|
||||
On rabbitmq, in the past, acknownlegement of messages was done within the
|
||||
application callback thread/greenlet. This thread was blocked until the
|
||||
message was ack. In newton, we rewrote the message acknownlegement to
|
||||
ensure we haven't two threads writting the the socket at the same times.
|
||||
Now all pendings ack are done by the main thread. They are no more reason
|
||||
to block the application callback thread until the message is ack. Other
|
||||
driver already release the application callback threads before the message
|
||||
is acknownleged. This is also the case for rabbitmq, now.
|
||||
|
Loading…
x
Reference in New Issue
Block a user