From fda6f29306633a778b04ca9d3a9be10ee24749d9 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 10 Jan 2014 13:50:09 -0600 Subject: [PATCH] Defunct connection when internal query fails Errors that occurred during wait_for_responses(), which is used by the control connection and for preparing statements, were not causing the connection to be closed or defuncted. --- cassandra/cluster.py | 1 - cassandra/io/asyncorereactor.py | 8 +++++++- cassandra/io/libevreactor.py | 8 +++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e59fbb27..2552328b 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -785,7 +785,6 @@ class Cluster(object): self.control_connection.wait_for_schema_agreement(connection) except Exception: log.debug("Error waiting for schema agreement before preparing statements against host %s", host, exc_info=True) - # TODO: potentially error out the connection? statements = self._prepared_statements.values() for keyspace, ks_statements in groupby(statements, lambda s: s.keyspace): diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 74373576..60c4078e 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -370,7 +370,13 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): raise OperationTimedOut() time.sleep(0.01) - return waiter.deliver(timeout) + try: + return waiter.deliver(timeout) + except OperationTimedOut: + raise + except Exception, exc: + self.defunct(exc) + raise def register_watcher(self, event_type, callback): self._push_watchers[event_type].add(callback) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 10475d1f..4cbc676d 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -418,7 +418,13 @@ class LibevConnection(Connection): raise OperationTimedOut() time.sleep(0.01) - return waiter.deliver(timeout) + try: + return waiter.deliver(timeout) + except OperationTimedOut: + raise + except Exception, exc: + self.defunct(exc) + raise def register_watcher(self, event_type, callback): self._push_watchers[event_type].add(callback)