Ensure in_flight is decremented when ops time out
Internal operations, such as control connection queries and preparing of statements, use a different method for synchronously executing queries. If those timed out, the in_flight count for the relevant connection would not be decremented, leaving the connection to appear more busy than it actually was.
This commit is contained in:
@@ -357,13 +357,16 @@ class Connection(object):
|
|||||||
|
|
||||||
class ResponseWaiter(object):
|
class ResponseWaiter(object):
|
||||||
|
|
||||||
def __init__(self, num_responses):
|
def __init__(self, connection, num_responses):
|
||||||
|
self.connection = connection
|
||||||
self.pending = num_responses
|
self.pending = num_responses
|
||||||
self.error = None
|
self.error = None
|
||||||
self.responses = [None] * num_responses
|
self.responses = [None] * num_responses
|
||||||
self.event = Event()
|
self.event = Event()
|
||||||
|
|
||||||
def got_response(self, response, index):
|
def got_response(self, response, index):
|
||||||
|
with self.connection.lock:
|
||||||
|
self.connection.in_flight -= 1
|
||||||
if isinstance(response, Exception):
|
if isinstance(response, Exception):
|
||||||
self.error = response
|
self.error = response
|
||||||
self.event.set()
|
self.event.set()
|
||||||
|
|||||||
@@ -347,7 +347,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
|
|||||||
|
|
||||||
def wait_for_responses(self, *msgs, **kwargs):
|
def wait_for_responses(self, *msgs, **kwargs):
|
||||||
timeout = kwargs.get('timeout')
|
timeout = kwargs.get('timeout')
|
||||||
waiter = ResponseWaiter(len(msgs))
|
waiter = ResponseWaiter(self, len(msgs))
|
||||||
|
|
||||||
# busy wait for sufficient space on the connection
|
# busy wait for sufficient space on the connection
|
||||||
messages_sent = 0
|
messages_sent = 0
|
||||||
@@ -370,11 +370,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
|
|||||||
raise OperationTimedOut()
|
raise OperationTimedOut()
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
try:
|
|
||||||
return waiter.deliver(timeout)
|
return waiter.deliver(timeout)
|
||||||
finally:
|
|
||||||
with self.lock:
|
|
||||||
self.in_flight -= len(msgs)
|
|
||||||
|
|
||||||
def register_watcher(self, event_type, callback):
|
def register_watcher(self, event_type, callback):
|
||||||
self._push_watchers[event_type].add(callback)
|
self._push_watchers[event_type].add(callback)
|
||||||
|
|||||||
@@ -395,7 +395,7 @@ class LibevConnection(Connection):
|
|||||||
|
|
||||||
def wait_for_responses(self, *msgs, **kwargs):
|
def wait_for_responses(self, *msgs, **kwargs):
|
||||||
timeout = kwargs.get('timeout')
|
timeout = kwargs.get('timeout')
|
||||||
waiter = ResponseWaiter(len(msgs))
|
waiter = ResponseWaiter(self, len(msgs))
|
||||||
|
|
||||||
# busy wait for sufficient space on the connection
|
# busy wait for sufficient space on the connection
|
||||||
messages_sent = 0
|
messages_sent = 0
|
||||||
@@ -418,11 +418,7 @@ class LibevConnection(Connection):
|
|||||||
raise OperationTimedOut()
|
raise OperationTimedOut()
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
try:
|
|
||||||
return waiter.deliver(timeout)
|
return waiter.deliver(timeout)
|
||||||
finally:
|
|
||||||
with self.lock:
|
|
||||||
self.in_flight -= len(msgs)
|
|
||||||
|
|
||||||
def register_watcher(self, event_type, callback):
|
def register_watcher(self, event_type, callback):
|
||||||
self._push_watchers[event_type].add(callback)
|
self._push_watchers[event_type].add(callback)
|
||||||
|
|||||||
Reference in New Issue
Block a user