From feb73eb7f5a987ed959e9de8c2039b1418bab3f4 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 16 Apr 2014 16:00:47 -0500 Subject: [PATCH 01/14] Fix typo in benchmark insert statement --- benchmarks/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index d6ed3005..ae4b44fc 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -100,7 +100,7 @@ def benchmark(thread_class): time.sleep(2.0) query = session.prepare(""" - INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?)) + INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?) """.format(table=TABLE)) values = ('key', 'a', 'b') From 9491928ff6d09962d56c600cd07a8c6ae0917f23 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 16 Apr 2014 16:24:27 -0500 Subject: [PATCH 02/14] Release version 1.1.0 --- CHANGELOG.rst | 2 +- cassandra/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 22b2745f..01568398 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,6 @@ 1.1.0 ===== -In Progress +April 16, 2014 Features -------- diff --git a/cassandra/__init__.py b/cassandra/__init__.py index a6cef863..ebee30c3 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -23,7 +23,7 @@ class NullHandler(logging.Handler): logging.getLogger('cassandra').addHandler(NullHandler()) -__version_info__ = (1, 0, 2, 'post') +__version_info__ = (1, 1, 0) __version__ = '.'.join(map(str, __version_info__)) From f9b59182f89ac5a7c0a7e43b0bf0e83f46d23703 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 16 Apr 2014 16:28:56 -0500 Subject: [PATCH 03/14] Increment version post-release --- cassandra/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index ebee30c3..6827e08d 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -23,7 +23,7 @@ class NullHandler(logging.Handler): logging.getLogger('cassandra').addHandler(NullHandler()) -__version_info__ = (1, 1, 0) +__version_info__ = (1, 1, 0, 'post') __version__ = '.'.join(map(str, __version_info__)) From b4a918e4857da7dc662ae1db6f747abefb7c83fe Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 16 Apr 2014 19:06:33 -0500 Subject: [PATCH 04/14] Fix unconditional import of nose in setup.py Fixes #111 --- CHANGELOG.rst | 8 ++++++++ setup.py | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 01568398..3e3a151e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,11 @@ +1.1.1 +===== +April 16, 2014 + +Bug Fixes +--------- +* Fix unconditional import of nose in setup.py (github #111) + 1.1.0 ===== April 16, 2014 diff --git a/setup.py b/setup.py index f4ecfa5e..11f7322d 100644 --- a/setup.py +++ b/setup.py @@ -17,9 +17,11 @@ import sys import ez_setup ez_setup.use_setuptools() +run_gevent_nosetests = False if __name__ == '__main__' and sys.argv[1] == "gevent_nosetests": from gevent.monkey import patch_all patch_all() + run_gevent_nosetests = True from setuptools import setup from distutils.command.build_ext import build_ext @@ -39,8 +41,6 @@ try: except ImportError: has_subprocess = False -from nose.commands import nosetests - from cassandra import __version__ long_description = "" @@ -48,8 +48,12 @@ with open("README.rst") as f: long_description = f.read() -class gevent_nosetests(nosetests): - description = "run nosetests with gevent monkey patching" +gevent_nosetests = None +if run_gevent_nosetests: + from nose.commands import nosetests + + class gevent_nosetests(nosetests): + description = "run nosetests with gevent monkey patching" class DocCommand(Command): @@ -169,7 +173,10 @@ On OSX, via homebrew: def run_setup(extensions): - kw = {'cmdclass': {'doc': DocCommand, 'gevent_nosetests': gevent_nosetests}} + kw = {'cmdclass': {'doc': DocCommand}} + if gevent_nosetests is not None: + kw['cmdclass']['gevent_nosetests'] = gevent_nosetests + if extensions: kw['cmdclass']['build_ext'] = build_extensions kw['ext_modules'] = extensions From 840064a9e27929c5f44ba06b72bfc3e69d905ee6 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 16 Apr 2014 19:13:20 -0500 Subject: [PATCH 05/14] Release 1.1.1 --- cassandra/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 6827e08d..3a912c23 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -23,7 +23,7 @@ class NullHandler(logging.Handler): logging.getLogger('cassandra').addHandler(NullHandler()) -__version_info__ = (1, 1, 0, 'post') +__version_info__ = (1, 1, 1) __version__ = '.'.join(map(str, __version_info__)) From 3a028b80c3f6cad78a147fc2e458aa4789dec443 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 16 Apr 2014 19:23:49 -0500 Subject: [PATCH 06/14] Remove gevent from TODO list in readme --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 3b224f67..f56c6d13 100644 --- a/README.rst +++ b/README.rst @@ -49,7 +49,7 @@ you can use `freenode's web-based client Date: Thu, 17 Apr 2014 18:46:38 -0500 Subject: [PATCH 07/14] Jenkins-ify these test exceptions on the master branch --- tests/integration/standard/test_connection.py | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index 3b585f67..8d6ab593 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -22,7 +22,8 @@ except ImportError: from functools import partial from threading import Thread, Event -from cassandra import ConsistencyLevel +from cassandra import ConsistencyLevel, OperationTimedOut +from cassandra.cluster import NoHostAvailable from cassandra.decoder import QueryMessage from cassandra.io.asyncorereactor import AsyncoreConnection @@ -36,11 +37,33 @@ class ConnectionTest(object): klass = None + def get_connection(self): + """ + Helper method to solve automated testing issues within Jenkins. + Officially patched under the 2.0 branch through + 17998ef72a2fe2e67d27dd602b6ced33a58ad8ef, but left as is for the + 1.0 branch due to possible regressions for fixing an + automated testing edge-case. + """ + conn = None + e = None + for i in xrange(5): + try: + conn = self.klass.factory() + break + except (OperationTimedOut, NoHostAvailable) as e: + continue + + if conn: + return conn + else: + raise e + def test_single_connection(self): """ Test a single connection with sequential requests. """ - conn = self.klass.factory() + conn = self.get_connection() query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1" event = Event() @@ -63,7 +86,7 @@ class ConnectionTest(object): """ Test a single connection with pipelined requests. """ - conn = self.klass.factory() + conn = self.get_connection() query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1" responses = [False] * 100 event = Event() @@ -85,7 +108,7 @@ class ConnectionTest(object): """ Test multiple connections with pipelined requests. """ - conns = [self.klass.factory() for i in range(5)] + conns = [self.get_connection() for i in range(5)] events = [Event() for i in range(5)] query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1" @@ -116,7 +139,7 @@ class ConnectionTest(object): num_threads = 5 event = Event() - conn = self.klass.factory() + conn = self.get_connection() query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1" def cb(all_responses, thread_responses, request_num, *args, **kwargs): @@ -173,7 +196,7 @@ class ConnectionTest(object): threads = [] for i in range(num_conns): - conn = self.klass.factory() + conn = self.get_connection() t = Thread(target=send_msgs, args=(conn, events[i])) threads.append(t) From 335acf3e2ea91882e36fd254818a564cf8ded5f8 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 22 Apr 2014 12:00:43 -0500 Subject: [PATCH 08/14] Update token metadata when a node is removed --- CHANGELOG.rst | 9 +++++++++ cassandra/cluster.py | 1 + 2 files changed, 10 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3e3a151e..910f4189 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,12 @@ +1.1.2 +===== +In Progress + +Bug Fixes +--------- +* Update token metadata (for TokenAware calculations) when a node is removed + from the ring + 1.1.1 ===== April 16, 2014 diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 58c928d9..bb34a947 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -765,6 +765,7 @@ class Cluster(object): session.on_remove(host) for listener in self.listeners: listener.on_remove(host) + self.control_connection.on_remove(host) def signal_connection_failure(self, host, connection_exc, is_host_addition): is_down = host.signal_connection_failure(connection_exc) From c0d8a1089bcb6fd9c650f01f794dfd13881e35aa Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 22 Apr 2014 12:04:01 -0500 Subject: [PATCH 09/14] Update load balancing policy integration test --- tests/integration/long/test_loadbalancingpolicies.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/integration/long/test_loadbalancingpolicies.py b/tests/integration/long/test_loadbalancingpolicies.py index ca8e8495..6c46103d 100644 --- a/tests/integration/long/test_loadbalancingpolicies.py +++ b/tests/integration/long/test_loadbalancingpolicies.py @@ -364,9 +364,12 @@ class LoadBalancingPolicyTests(unittest.TestCase): self._query(session, keyspace, use_prepared=use_prepared) - self.coordinator_stats.assert_query_count_equals(self, 1, 6) + # TODO: this depends on the token order of the cluster; either set up + # the tokens deliberately or confirm that only one node is used and + # it's the correct replica + self.coordinator_stats.assert_query_count_equals(self, 1, 12) self.coordinator_stats.assert_query_count_equals(self, 2, 0) - self.coordinator_stats.assert_query_count_equals(self, 3, 6) + self.coordinator_stats.assert_query_count_equals(self, 3, 0) def test_token_aware_composite_key(self): use_singledc() From a223c3a7d89b878746b4b7841add75f23f74219c Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Tue, 22 Apr 2014 13:00:45 -0500 Subject: [PATCH 10/14] fix loadbalancing fallback tests --- .../long/test_loadbalancingpolicies.py | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/tests/integration/long/test_loadbalancingpolicies.py b/tests/integration/long/test_loadbalancingpolicies.py index 6c46103d..9e836ac5 100644 --- a/tests/integration/long/test_loadbalancingpolicies.py +++ b/tests/integration/long/test_loadbalancingpolicies.py @@ -359,17 +359,29 @@ class LoadBalancingPolicyTests(unittest.TestCase): self.coordinator_stats.assert_query_count_equals(self, 3, 0) self.coordinator_stats.reset_counts() + stop(2) + wait_for_down(cluster, 2, wait=True) + + try: + self._query(session, keyspace, use_prepared=use_prepared) + self.fail() + except Unavailable: + pass + + self.coordinator_stats.reset_counts() + start(2) + wait_for_up(cluster, 2, wait=True) decommission(2) wait_for_down(cluster, 2, wait=True) self._query(session, keyspace, use_prepared=use_prepared) - # TODO: this depends on the token order of the cluster; either set up - # the tokens deliberately or confirm that only one node is used and - # it's the correct replica - self.coordinator_stats.assert_query_count_equals(self, 1, 12) + results = { + self.coordinator_stats.get_query_count(1), + self.coordinator_stats.get_query_count(3) + } + self.assertEqual(results, {0, 12}) self.coordinator_stats.assert_query_count_equals(self, 2, 0) - self.coordinator_stats.assert_query_count_equals(self, 3, 0) def test_token_aware_composite_key(self): use_singledc() From cbe3dc9bf18e681a9a293b4f2cc1faa1fea5fc7c Mon Sep 17 00:00:00 2001 From: Joaquin Casares Date: Tue, 22 Apr 2014 14:37:14 -0500 Subject: [PATCH 11/14] Use python 2.6 sets --- tests/integration/long/test_loadbalancingpolicies.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/long/test_loadbalancingpolicies.py b/tests/integration/long/test_loadbalancingpolicies.py index 9e836ac5..10a8ab36 100644 --- a/tests/integration/long/test_loadbalancingpolicies.py +++ b/tests/integration/long/test_loadbalancingpolicies.py @@ -376,11 +376,11 @@ class LoadBalancingPolicyTests(unittest.TestCase): self._query(session, keyspace, use_prepared=use_prepared) - results = { + results = set([ self.coordinator_stats.get_query_count(1), self.coordinator_stats.get_query_count(3) - } - self.assertEqual(results, {0, 12}) + ]) + self.assertEqual(results, set([0, 12])) self.coordinator_stats.assert_query_count_equals(self, 2, 0) def test_token_aware_composite_key(self): From b7b49b1db0562d7703cbb0f69cf58eab917861b1 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 22 Apr 2014 15:10:26 -0500 Subject: [PATCH 12/14] Don't block on killing reader/writer Greenlets --- CHANGELOG.rst | 2 ++ cassandra/io/geventreactor.py | 14 ++++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 910f4189..d72fcf7d 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,6 +6,8 @@ Bug Fixes --------- * Update token metadata (for TokenAware calculations) when a node is removed from the ring +* Fix file handle leak with gevent reactor due to blocking Greenlet kills when + closing excess connections 1.1.1 ===== diff --git a/cassandra/io/geventreactor.py b/cassandra/io/geventreactor.py index 322b9c00..94e069af 100644 --- a/cassandra/io/geventreactor.py +++ b/cassandra/io/geventreactor.py @@ -98,9 +98,9 @@ class GeventConnection(Connection): log.debug("Closing connection (%s) to %s" % (id(self), self.host)) if self._read_watcher: - self._read_watcher.kill() + self._read_watcher.kill(block=False) if self._write_watcher: - self._write_watcher.kill() + self._write_watcher.kill(block=False) if self._socket: self._socket.close() log.debug("Closed socket to %s" % (self.host,)) @@ -122,8 +122,9 @@ class GeventConnection(Connection): next_msg = self._write_queue.get() run_select() except Exception as exc: - log.debug("Exception during write select() for %s: %s", self, exc) - self.defunct(exc) + if not self.is_closed: + log.debug("Exception during write select() for %s: %s", self, exc) + self.defunct(exc) return try: @@ -139,8 +140,9 @@ class GeventConnection(Connection): try: run_select() except Exception as exc: - log.debug("Exception during read select() for %s: %s", self, exc) - self.defunct(exc) + if not self.is_closed: + log.debug("Exception during read select() for %s: %s", self, exc) + self.defunct(exc) return try: From 469dd674f3db24a903feebccb200f1b0b6223118 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 22 Apr 2014 16:48:44 -0500 Subject: [PATCH 13/14] Avoid multiple simultaneous node-up handlers --- CHANGELOG.rst | 2 ++ cassandra/cluster.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d72fcf7d..c53c6731 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,8 @@ Bug Fixes from the ring * Fix file handle leak with gevent reactor due to blocking Greenlet kills when closing excess connections +* Avoid handling a node coming up multiple times due to a reconnection attempt + succeeding close to the same time that an UP notification is pushed 1.1.1 ===== diff --git a/cassandra/cluster.py b/cassandra/cluster.py index bb34a947..581d7768 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -599,7 +599,7 @@ class Cluster(object): host._handle_node_up_condition.acquire() while host._currently_handling_node_up: host._handle_node_up_condition.wait() - host.handling_up_down = True + host._currently_handling_node_up = True host._handle_node_up_condition.release() if host.is_up: From d20db205f1bda260362fd3a2097fa6f4b46c5360 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 22 Apr 2014 16:53:59 -0500 Subject: [PATCH 14/14] Wait an extra second before handling UP messages --- cassandra/cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 581d7768..4d5a030d 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1659,10 +1659,10 @@ class ControlConnection(object): if change_type == "UP": if host is None: # this is the first time we've seen the node - self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map) + self._cluster.scheduler.schedule(2, self.refresh_node_list_and_token_map) else: # this will be run by the scheduler - self._cluster.scheduler.schedule(1, self._cluster.on_up, host) + self._cluster.scheduler.schedule(2, self._cluster.on_up, host) elif change_type == "DOWN": # Note that there is a slight risk we can receive the event late and thus # mark the host down even though we already had reconnected successfully.