From 94181676dbd38e2440c5af013f2caccab18c3487 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Wed, 5 Mar 2014 16:24:08 -0600 Subject: [PATCH 01/12] Ignore setuptools eggs, too --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index d24b9945..b8ce3535 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ cover/ docs/_build/ tests/integration/ccm setuptools*.tar.gz +setuptools*.egg From ca044108ba7c8a97214a5719157b0db590b1d678 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 6 Mar 2014 16:22:08 -0600 Subject: [PATCH 02/12] Use prepared statements for benchmarks --- benchmarks/base.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/benchmarks/base.py b/benchmarks/base.py index 90f41703..f00df6b0 100644 --- a/benchmarks/base.py +++ b/benchmarks/base.py @@ -15,7 +15,6 @@ sys.path.append(os.path.join(dirname, '..')) from cassandra.cluster import Cluster from cassandra.io.asyncorereactor import AsyncoreConnection from cassandra.policies import HostDistance -from cassandra.query import SimpleStatement log = logging.getLogger() handler = logging.StreamHandler() @@ -86,11 +85,10 @@ def benchmark(thread_class): log.debug("Sleeping for two seconds...") time.sleep(2.0) - query = SimpleStatement(""" - INSERT INTO {table} (thekey, col1, col2) - VALUES (%(key)s, %(a)s, %(b)s) + query = session.prepare(""" + INSERT INTO {table} (thekey, col1, col2) VALUES (?, ?, ?)) """.format(table=TABLE)) - values = {'key': 'key', 'a': 'a', 'b': 'b'} + values = ('key', 'a', 'b') per_thread = options.num_ops / options.threads threads = [] From aaabb2212375fbff0a27d677f5b09d070a569213 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 6 Mar 2014 17:28:25 -0600 Subject: [PATCH 03/12] Fix max_compaction_threshold option in metadata --- CHANGELOG.rst | 6 ++++++ cassandra/metadata.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c0083d12..673cf7d9 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,12 @@ Features * Support static columns in schemas, which are available starting in Cassandra 2.1. (github issue #91) +Bug Fixes +--------- +* Fix max_compaction_threshold option for TableMetadata. Without this + fix, table schemas generated by the driver will not specify a + max_compaction_threshold. + Other ----- * Don't ignore column names when parsing typestrings. This is needed for diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 0e348ae2..f797ab57 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -631,7 +631,7 @@ class TableMetadata(object): "compaction_strategy_class", "compaction_strategy_options", "min_compaction_threshold", - "max_compression_threshold", + "max_compaction_threshold", "compression_parameters", "min_index_interval", "max_index_interval", From ee6d2cf56fab610ec149f1f1254de9fac8d194cf Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 6 Mar 2014 13:06:45 -0600 Subject: [PATCH 04/12] Correctly clear ccm after changing C* version Conflicts: tests/integration/__init__.py --- tests/integration/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 235288bb..c3cf977d 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -18,6 +18,7 @@ except ImportError as e: CLUSTER_NAME = 'test_cluster' CCM_CLUSTER = None +DEFAULT_CASSANDRA_VERSION = '1.2.9' path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'ccm') if not os.path.exists(path): @@ -58,14 +59,16 @@ def get_node(node_id): def setup_package(): + version = os.getenv("CASSANDRA_VERSION", DEFAULT_CASSANDRA_VERSION) try: try: cluster = CCMCluster.load(path, CLUSTER_NAME) log.debug("Found existing ccm test cluster, clearing") cluster.clear() + cluster.set_cassandra_dir(cassandra_version=version) except Exception: - log.debug("Creating new ccm test cluster") - cluster = CCMCluster(path, CLUSTER_NAME, cassandra_version='1.2.9') + log.debug("Creating new ccm test cluster with version %s", version) + cluster = CCMCluster(path, CLUSTER_NAME, cassandra_version=version) cluster.set_configuration_options({'start_native_transport': True}) common.switch_cluster(path, CLUSTER_NAME) cluster.populate(3) From 2503206132faf38ee344bf166979581c964f6345 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 6 Mar 2014 18:33:30 -0600 Subject: [PATCH 05/12] Fix compaction, compression options when generating schemas --- cassandra/metadata.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/cassandra/metadata.py b/cassandra/metadata.py index f797ab57..6e9ed3bf 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -644,6 +644,11 @@ class TableMetadata(object): "compression", "default_time_to_live") + compaction_options = { + "min_compaction_threshold": "min_threshold", + "max_compaction_threshold": "max_threshold", + "compaction_strategy_class": "class"} + def __init__(self, keyspace_metadata, name, partition_key=None, clustering_key=None, columns=None, options=None): self.keyspace = keyspace_metadata self.name = name @@ -744,13 +749,36 @@ class TableMetadata(object): def _make_option_strings(self): ret = [] - for name, value in sorted(self.options.items()): + options_copy = dict(self.options.items()) + if not options_copy.get('compaction'): + options_copy.pop('compaction', None) + + actual_options = json.loads(options_copy.pop('compaction_strategy_options', '{}')) + for system_table_name, compact_option_name in self.compaction_options.items(): + value = options_copy.pop(system_table_name, None) + if value: + actual_options.setdefault(compact_option_name, value) + + compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()] + ret.append('compaction = {%s}' % ', '.join(compaction_option_strings)) + + for system_table_name in self.compaction_options.keys(): + options_copy.pop(system_table_name, None) # delete if present + options_copy.pop('compaction_strategy_option', None) + + if not options_copy.get('compression'): + params = json.loads(options_copy.pop('compression_parameters', '{}')) + if params: + param_strings = ["'%s': '%s'" % (k, v) for k, v in params.items()] + ret.append('compression = {%s}' % ', '.join(param_strings)) + + for name, value in options_copy.items(): if value is not None: if name == "comment": value = value or "" ret.append("%s = %s" % (name, protect_value(value))) - return ret + return list(sorted(ret)) def protect_name(name): From 253ce163689489b8c419d977d3283f6443487a79 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 6 Mar 2014 18:52:27 -0600 Subject: [PATCH 06/12] Update changelog --- CHANGELOG.rst | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 673cf7d9..81356908 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,9 +9,8 @@ Features Bug Fixes --------- -* Fix max_compaction_threshold option for TableMetadata. Without this - fix, table schemas generated by the driver will not specify a - max_compaction_threshold. +* Correctly supply compaction and compression parameters in CREATE statements + for tables when working with Cassandra 2.0+ Other ----- From e4e6e705c4ed5233996468cca26bfdf86c6a9d6a Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 6 Mar 2014 16:47:07 -0600 Subject: [PATCH 07/12] Ignore reference errors in the control conn These frequently happen when the cluster is being shutdown and the control connection is refreshing the schema or ring topology --- cassandra/cluster.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index f672b836..839f58b1 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1476,6 +1476,8 @@ class ControlConnection(object): try: if self._connection: self._refresh_schema(self._connection, keyspace, table) + except ReferenceError: + pass # our weak reference to the Cluster is no good except Exception: log.debug("[control connection] Error refreshing schema", exc_info=True) self._signal_error() @@ -1522,6 +1524,8 @@ class ControlConnection(object): try: if self._connection: self._refresh_node_list_and_token_map(self._connection) + except ReferenceError: + pass # our weak reference to the Cluster is no good except Exception: log.debug("[control connection] Error refreshing node list and token map", exc_info=True) self._signal_error() From 9a49fa0a9d746b88d18525bfa86a8623efa83cf1 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 6 Mar 2014 17:16:03 -0600 Subject: [PATCH 08/12] Silence control conn errors around shutdown --- cassandra/cluster.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 839f58b1..163c8cc4 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1419,7 +1419,7 @@ class ControlConnection(object): if self._is_shutdown: return - self._cluster.executor.submit(self._reconnect) + self._submit(self._reconnect) def _reconnect(self): log.debug("[control connection] Attempting to reconnect") @@ -1458,6 +1458,14 @@ class ControlConnection(object): self._reconnection_handler = new_handler return old + def _submit(self, *args, **kwargs): + try: + if not self._cluster._is_shutdown: + return self._cluster.executor.submit(*args, **kwargs) + except ReferenceError: + pass + return None + def shutdown(self): with self._lock: if self._is_shutdown: @@ -1483,6 +1491,9 @@ class ControlConnection(object): self._signal_error() def _refresh_schema(self, connection, keyspace=None, table=None): + if self._cluster._is_shutdown: + return + self.wait_for_schema_agreement(connection) where_clause = "" @@ -1624,9 +1635,9 @@ class ControlConnection(object): table = event['table'] or None if event['change_type'] in ("CREATED", "DROPPED"): keyspace = keyspace if table else None - self._cluster.executor.submit(self.refresh_schema, keyspace) + self._submit(self.refresh_schema, keyspace) elif event['change_type'] == "UPDATED": - self._cluster.executor.submit(self.refresh_schema, keyspace, table) + self._submit(self.refresh_schema, keyspace, table) def wait_for_schema_agreement(self, connection=None): # Each schema change typically generates two schema refreshes, one From bba3c67a8df67d4dda394390ab5db4f9fcddb421 Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Thu, 6 Mar 2014 19:07:07 -0600 Subject: [PATCH 09/12] Update missed unit test --- tests/unit/test_control_connection.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/test_control_connection.py b/tests/unit/test_control_connection.py index 2b7aec44..18aadfee 100644 --- a/tests/unit/test_control_connection.py +++ b/tests/unit/test_control_connection.py @@ -16,6 +16,7 @@ from cassandra.policies import (SimpleConvictionPolicy, RoundRobinPolicy, PEER_IP = "foobar" + class MockMetadata(object): def __init__(self): @@ -49,6 +50,7 @@ class MockCluster(object): reconnection_policy = ConstantReconnectionPolicy(2) down_host = None contact_points = [] + _is_shutdown = False def __init__(self): self.metadata = MockMetadata() From c0f3c27d7587d63d259652d89f5fc5986a55987b Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 7 Mar 2014 12:13:25 -0600 Subject: [PATCH 10/12] Lowercase boolean literals when generating schemas --- CHANGELOG.rst | 1 + cassandra/metadata.py | 2 +- tests/unit/test_metadata.py | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 81356908..61178a70 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -11,6 +11,7 @@ Bug Fixes --------- * Correctly supply compaction and compression parameters in CREATE statements for tables when working with Cassandra 2.0+ +* Lowercase boolean literals when generating schemas Other ----- diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 6e9ed3bf..172c0056 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -795,7 +795,7 @@ def protect_value(value): if value is None: return 'NULL' if isinstance(value, (int, float, bool)): - return str(value) + return str(value).lower() return "'%s'" % value.replace("'", "''") diff --git a/tests/unit/test_metadata.py b/tests/unit/test_metadata.py index ce7b20dd..5464fb3a 100644 --- a/tests/unit/test_metadata.py +++ b/tests/unit/test_metadata.py @@ -169,8 +169,8 @@ class TestNameEscaping(unittest.TestCase): """ Test cassandra.metadata.protect_value output """ - self.assertEqual(protect_value(True), "True") - self.assertEqual(protect_value(False), "False") + self.assertEqual(protect_value(True), "true") + self.assertEqual(protect_value(False), "false") self.assertEqual(protect_value(3.14), '3.14') self.assertEqual(protect_value(3), '3') self.assertEqual(protect_value('test'), "'test'") From e5b751333cfcce22b5f10a0b2067760d5088bb9f Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Fri, 7 Mar 2014 16:32:13 -0600 Subject: [PATCH 11/12] Ingore SSL_ERROR_WANT_READ/WRITE errors Before this change, the connection would be defuncted when the error occurred. Instead, these can safely be ignored and the recv() call retried. --- CHANGELOG.rst | 3 +++ cassandra/io/asyncorereactor.py | 6 +++++- cassandra/io/libevreactor.py | 6 +++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 61178a70..3797fccf 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,9 @@ Bug Fixes * Correctly supply compaction and compression parameters in CREATE statements for tables when working with Cassandra 2.0+ * Lowercase boolean literals when generating schemas +* Ignore SSL_ERROR_WANT_READ and SSL_ERROR_WANT_WRITE socket errors. Previously, + these resulted in the connection being defuncted, but they can safely be + ignored by the driver. Other ----- diff --git a/cassandra/io/asyncorereactor.py b/cassandra/io/asyncorereactor.py index 6f80e48a..ebb19314 100644 --- a/cassandra/io/asyncorereactor.py +++ b/cassandra/io/asyncorereactor.py @@ -258,7 +258,11 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): if len(buf) < self.in_buffer_size: break except socket.error as err: - if err.args[0] not in NONBLOCKING: + if ssl and isinstance(err, ssl.SSLError): + if err.args[0] not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE): + self.defunct(err) + return + elif err.args[0] not in NONBLOCKING: self.defunct(err) return diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 3d298fb8..0fbf21fc 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -311,7 +311,11 @@ class LibevConnection(Connection): if len(buf) < self.in_buffer_size: break except socket.error as err: - if err.args[0] not in NONBLOCKING: + if ssl and isinstance(err, ssl.SSLError): + if err.args[0] not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE): + self.defunct(err) + return + elif err.args[0] not in NONBLOCKING: self.defunct(err) return From ff7a05abec4fc8d45dbcc51c0381e81cf66c8cdc Mon Sep 17 00:00:00 2001 From: Tyler Hobbs Date: Tue, 11 Mar 2014 19:08:26 -0500 Subject: [PATCH 12/12] Better err message when libevwrapper is not found --- CHANGELOG.rst | 1 + cassandra/io/libevreactor.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3797fccf..49935615 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -20,6 +20,7 @@ Other ----- * Don't ignore column names when parsing typestrings. This is needed for user-defined type support. (github issue #90) +* Better error message when libevwrapper is not found 1.0.2 ===== diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 0fbf21fc..a5d83d4e 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -14,7 +14,16 @@ from cassandra.connection import (Connection, ResponseWaiter, ConnectionShutdown MAX_STREAM_PER_CONNECTION) from cassandra.decoder import RegisterMessage from cassandra.marshal import int32_unpack -import cassandra.io.libevwrapper as libev +try: + import cassandra.io.libevwrapper as libev +except ImportError: + raise ImportError( + "The C extension needed to use libev was not found. This " + "probably means that you didn't have the required build dependencies " + "when installing the driver. See " + "http://datastax.github.io/python-driver/installation.html#c-extensions " + "for instructions on installing build dependencies and building " + "the C extension.") try: from cStringIO import StringIO