From 0e7775532ae1326c33aa80383fd25411a83a9b7b Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 9 Jan 2014 13:24:59 -0600 Subject: [PATCH] Ensure in_flight is decremented when ops time out Internal operations, such as control connection queries and preparing of statements, use a different method for synchronously executing queries. If those timed out, the in_flight count for the relevant connection would not be decremented, leaving the connection to appear more busy than it actually was. --- cassandra/connection.py | 5 ++++- cassandra/io/asyncorereactor.py | 8 ++------ cassandra/io/libevreactor.py | 8 ++------ 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/cassandra/connection.py b/cassandra/connection.py index 027259ea..22d4fd01 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -357,13 +357,16 @@ class Connection(object): class ResponseWaiter(object): - def __init__(self, num_responses): + def __init__(self, connection, num_responses): + self.connection = connection self.pending = num_responses self.error = None self.responses = [None] * num_responses self.event = Event() def got_response(self, response, index): + with self.connection.lock: + self.connection.in_flight -= 1 if isinstance(response, Exception): self.error = response self.event.set() diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 706e7b7c..74373576 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -347,7 +347,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): def wait_for_responses(self, *msgs, **kwargs): timeout = kwargs.get('timeout') - waiter = ResponseWaiter(len(msgs)) + waiter = ResponseWaiter(self, len(msgs)) # busy wait for sufficient space on the connection messages_sent = 0 @@ -370,11 +370,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): raise OperationTimedOut() time.sleep(0.01) - try: - return waiter.deliver(timeout) - finally: - with self.lock: - self.in_flight -= len(msgs) + return waiter.deliver(timeout) def register_watcher(self, event_type, callback): self._push_watchers[event_type].add(callback) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index e5dbcd7b..10475d1f 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -395,7 +395,7 @@ class LibevConnection(Connection): def wait_for_responses(self, *msgs, **kwargs): timeout = kwargs.get('timeout') - waiter = ResponseWaiter(len(msgs)) + waiter = ResponseWaiter(self, len(msgs)) # busy wait for sufficient space on the connection messages_sent = 0 @@ -418,11 +418,7 @@ class LibevConnection(Connection): raise OperationTimedOut() time.sleep(0.01) - try: - return waiter.deliver(timeout) - finally: - with self.lock: - self.in_flight -= len(msgs) + return waiter.deliver(timeout) def register_watcher(self, event_type, callback): self._push_watchers[event_type].add(callback)