Merge "Have the timeout decrement inside the wait() method"
This commit is contained in:
commit
42a2df15dd
@ -31,6 +31,30 @@ from oslo.messaging._i18n import _LI
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _DecayingTimer(object):
|
||||
def __init__(self, duration=None):
|
||||
self._duration = duration
|
||||
self._ends_at = None
|
||||
|
||||
def start(self):
|
||||
if self._duration is not None:
|
||||
self._ends_at = time.time() + max(0, self._duration)
|
||||
return self
|
||||
|
||||
def check_return(self, msg_id):
|
||||
if self._duration is None:
|
||||
return None
|
||||
if self._ends_at is None:
|
||||
raise RuntimeError("Can not check/return a timeout from a timer"
|
||||
" that has not been started")
|
||||
left = self._ends_at - time.time()
|
||||
if left <= 0:
|
||||
raise messaging.MessagingTimeout('Timed out waiting for a '
|
||||
'reply to message ID %s'
|
||||
% msg_id)
|
||||
return left
|
||||
|
||||
|
||||
class AMQPIncomingMessage(base.IncomingMessage):
|
||||
|
||||
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
|
||||
@ -221,7 +245,7 @@ class ReplyWaiter(object):
|
||||
result = data['result']
|
||||
return result, ending
|
||||
|
||||
def _poll_connection(self, msg_id, timeout):
|
||||
def _poll_connection(self, msg_id, timer):
|
||||
while True:
|
||||
while self.incoming:
|
||||
message_data = self.incoming.pop(0)
|
||||
@ -233,14 +257,14 @@ class ReplyWaiter(object):
|
||||
self.waiters.put(incoming_msg_id, message_data)
|
||||
|
||||
try:
|
||||
self.conn.consume(limit=1, timeout=timeout)
|
||||
self.conn.consume(limit=1, timeout=timer.check_return(msg_id))
|
||||
except rpc_common.Timeout:
|
||||
raise messaging.MessagingTimeout('Timed out waiting for a '
|
||||
'reply to message ID %s'
|
||||
% msg_id)
|
||||
|
||||
def _poll_queue(self, msg_id, timeout):
|
||||
message = self.waiters.get(msg_id, timeout)
|
||||
def _poll_queue(self, msg_id, timer):
|
||||
message = self.waiters.get(msg_id, timeout=timer.check_return(msg_id))
|
||||
if message is self.waiters.WAKE_UP:
|
||||
return None, None, True # lock was released
|
||||
|
||||
@ -269,6 +293,7 @@ class ReplyWaiter(object):
|
||||
# have the first thread take responsibility for passing replies not
|
||||
# intended for itself to the appropriate thread.
|
||||
#
|
||||
timer = _DecayingTimer(duration=timeout).start()
|
||||
final_reply = None
|
||||
while True:
|
||||
if self.conn_lock.acquire(False):
|
||||
@ -287,7 +312,7 @@ class ReplyWaiter(object):
|
||||
|
||||
# Now actually poll the connection
|
||||
while True:
|
||||
reply, ending = self._poll_connection(msg_id, timeout)
|
||||
reply, ending = self._poll_connection(msg_id, timer)
|
||||
if not ending:
|
||||
final_reply = reply
|
||||
else:
|
||||
@ -300,7 +325,7 @@ class ReplyWaiter(object):
|
||||
self.waiters.wake_all(msg_id)
|
||||
else:
|
||||
# We're going to wait for the first thread to pass us our reply
|
||||
reply, ending, trylock = self._poll_queue(msg_id, timeout)
|
||||
reply, ending, trylock = self._poll_queue(msg_id, timer)
|
||||
if trylock:
|
||||
# The first thread got its reply, let's try and take over
|
||||
# the responsibility for polling
|
||||
|
Loading…
x
Reference in New Issue
Block a user