Merge branch '2.0' into del-and-daemon-thread-cleanup

Conflicts:
	cassandra/cluster.py
	cassandra/connection.py
	cassandra/io/asyncorereactor.py
	cassandra/io/libevreactor.py
This commit is contained in:
Tyler Hobbs
2014-04-16 15:20:46 -05:00
72 changed files with 1874 additions and 75 deletions

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@
*.swp
*.swo
*.so
*.egg
*.egg-info
.tox
.idea/

View File

@@ -8,6 +8,9 @@ Features
github issue #46)
* Support static columns in schemas, which are available starting in
Cassandra 2.1. (github issue #91)
* Add debian packaging (github issue #101)
* Add utility methods for easy concurrent execution of statements. See
the new cassandra.concurrent module. (github issue #7)
Bug Fixes
---------
@@ -27,6 +30,8 @@ Bug Fixes
and rack information has been set, if possible.
* Avoid KeyError when updating metadata after droping a table (github issues
#97, #98)
* Use tuples instead of sets for DCAwareLoadBalancingPolicy to ensure equal
distribution of requests
Other
-----
@@ -34,6 +39,10 @@ Other
user-defined type support. (github issue #90)
* Better error message when libevwrapper is not found
* Only try to import scales when metrics are enabled (github issue #92)
* Cut down on the number of queries executing when a new Cluster
connects and when the control connection has to reconnect (github issue #104,
PYTHON-59)
* Issue warning log when schema versions do not match
1.0.2
=====

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cProfile import Profile
import logging
import os.path

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from itertools import count

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import Queue

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import Queue

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from base import benchmark, BenchmarkThread

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from base import benchmark, BenchmarkThread
class Runner(BenchmarkThread):

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module houses the main classes you will interact with,
:class:`.Cluster` and :class:`.Session`.
@@ -5,6 +19,7 @@ This module houses the main classes you will interact with,
from __future__ import absolute_import
import atexit
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import logging
import socket
@@ -41,7 +56,7 @@ from cassandra.policies import (RoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance,
RetryPolicy)
from cassandra.pool import (_ReconnectionHandler, _HostReconnectionHandler,
HostConnectionPool)
HostConnectionPool, NoConnectionsAvailable)
from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement,
BatchStatement, bind_params, QueryTrace, Statement,
named_tuple_factory, dict_factory)
@@ -1369,8 +1384,8 @@ class ControlConnection(object):
_SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies"
_SELECT_COLUMNS = "SELECT * FROM system.schema_columns"
_SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address FROM system.peers"
_SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner FROM system.local WHERE key='local'"
_SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers"
_SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner, schema_version FROM system.local WHERE key='local'"
_SELECT_SCHEMA_PEERS = "SELECT rpc_address, schema_version FROM system.peers"
_SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
@@ -1459,8 +1474,13 @@ class ControlConnection(object):
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
}, register_timeout=self._timeout)
self._refresh_node_list_and_token_map(connection)
self._refresh_schema(connection)
peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=ConsistencyLevel.ONE)
local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=ConsistencyLevel.ONE)
shared_results = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)
self._refresh_node_list_and_token_map(connection, preloaded_results=shared_results)
self._refresh_schema(connection, preloaded_results=shared_results)
except Exception:
connection.close()
raise
@@ -1544,11 +1564,11 @@ class ControlConnection(object):
log.debug("[control connection] Error refreshing schema", exc_info=True)
self._signal_error()
def _refresh_schema(self, connection, keyspace=None, table=None):
def _refresh_schema(self, connection, keyspace=None, table=None, preloaded_results=None):
if self._cluster.is_shutdown:
return
self.wait_for_schema_agreement(connection)
self.wait_for_schema_agreement(connection, preloaded_results=preloaded_results)
where_clause = ""
if keyspace:
@@ -1595,13 +1615,19 @@ class ControlConnection(object):
log.debug("[control connection] Error refreshing node list and token map", exc_info=True)
self._signal_error()
def _refresh_node_list_and_token_map(self, connection):
log.debug("[control connection] Refreshing node list and token map")
cl = ConsistencyLevel.ONE
peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=cl)
local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=cl)
peers_result, local_result = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)
def _refresh_node_list_and_token_map(self, connection, preloaded_results=None):
if preloaded_results:
log.debug("[control connection] Refreshing node list and token map using preloaded results")
peers_result = preloaded_results[0]
local_result = preloaded_results[1]
else:
log.debug("[control connection] Refreshing node list and token map")
cl = ConsistencyLevel.ONE
peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=cl)
local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=cl)
peers_result, local_result = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)
peers_result = dict_factory(*peers_result.results)
partitioner = None
@@ -1709,7 +1735,7 @@ class ControlConnection(object):
elif event['change_type'] == "UPDATED":
self._submit(self.refresh_schema, keyspace, table)
def wait_for_schema_agreement(self, connection=None):
def wait_for_schema_agreement(self, connection=None, preloaded_results=None):
# Each schema change typically generates two schema refreshes, one
# from the response type and one from the pushed notification. Holding
# a lock is just a simple way to cut down on the number of schema queries
@@ -1718,14 +1744,24 @@ class ControlConnection(object):
if self._is_shutdown:
return
log.debug("[control connection] Waiting for schema agreement")
if not connection:
connection = self._connection
if preloaded_results:
log.debug("[control connection] Attempting to use preloaded results for schema agreement")
peers_result = preloaded_results[0]
local_result = preloaded_results[1]
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.host)
if schema_mismatches is None:
return True
log.debug("[control connection] Waiting for schema agreement")
start = self._time.time()
elapsed = 0
cl = ConsistencyLevel.ONE
total_timeout = self._cluster.max_schema_agreement_wait
schema_mismatches = None
while elapsed < total_timeout:
peers_query = QueryMessage(query=self._SELECT_SCHEMA_PEERS, consistency_level=cl)
local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl)
@@ -1739,36 +1775,45 @@ class ControlConnection(object):
elapsed = self._time.time() - start
continue
peers_result = dict_factory(*peers_result.results)
versions = set()
if local_result.results:
local_row = dict_factory(*local_result.results)[0]
if local_row.get("schema_version"):
versions.add(local_row.get("schema_version"))
for row in peers_result:
if not row.get("rpc_address") or not row.get("schema_version"):
continue
rpc = row.get("rpc_address")
if rpc == "0.0.0.0": # TODO ipv6 check
rpc = row.get("peer")
peer = self._cluster.metadata.get_host(rpc)
if peer and peer.is_up:
versions.add(row.get("schema_version"))
if len(versions) == 1:
log.debug("[control connection] Schemas match")
schema_mismatches = self._get_schema_mismatches(peers_result, local_result, connection.host)
if schema_mismatches is None:
return True
log.debug("[control connection] Schemas mismatched, trying again")
self._time.sleep(0.2)
elapsed = self._time.time() - start
log.warn("Node %s is reporting a schema disagreement: %s",
connection.host, schema_mismatches)
return False
def _get_schema_mismatches(self, peers_result, local_result, local_address):
peers_result = dict_factory(*peers_result.results)
versions = defaultdict(set)
if local_result.results:
local_row = dict_factory(*local_result.results)[0]
if local_row.get("schema_version"):
versions[local_row.get("schema_version")].add(local_address)
for row in peers_result:
if not row.get("rpc_address") or not row.get("schema_version"):
continue
rpc = row.get("rpc_address")
if rpc == "0.0.0.0": # TODO ipv6 check
rpc = row.get("peer")
peer = self._cluster.metadata.get_host(rpc)
if peer and peer.is_up:
versions[row.get("schema_version")].add(rpc)
if len(versions) == 1:
log.debug("[control connection] Schemas match")
return None
return dict((version, list(nodes)) for version, nodes in versions.iteritems())
def _signal_error(self):
# try just signaling the cluster, as this will trigger a reconnect
# as part of marking the host down
@@ -1987,6 +2032,10 @@ class ResponseFuture(object):
# TODO get connectTimeout from cluster settings
connection = pool.borrow_connection(timeout=2.0)
request_id = connection.send_msg(message, cb=cb)
except NoConnectionsAvailable as exc:
log.debug("All connections for host %s are at capacity, moving to the next host", host)
self._errors[host] = exc
return None
except Exception as exc:
log.debug("Error querying host %s", host, exc_info=True)
self._errors[host] = exc

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from itertools import count, cycle
@@ -51,9 +65,16 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
if not statements_and_parameters:
return []
# TODO handle iterators and generators naturally without converting the
# whole thing to a list. This would requires not building a result
# list of Nones up front (we don't know how many results there will be),
# so a dict keyed by index should be used instead. The tricky part is
# knowing when you're the final statement to finish.
statements_and_parameters = list(statements_and_parameters)
event = Event()
first_error = [] if raise_on_first_error else None
to_execute = len(statements_and_parameters) # TODO handle iterators/generators
to_execute = len(statements_and_parameters)
results = [None] * to_execute
num_finished = count(start=1)
statements = enumerate(iter(statements_and_parameters))
@@ -62,7 +83,12 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
event.wait()
if first_error:
raise first_error[0]
exc = first_error[0]
if isinstance(exc, tuple):
(exc_type, value, traceback) = exc
raise exc_type, value, traceback
else:
raise exc
else:
return results

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import errno
from functools import wraps, partial

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Representation of Cassandra data types. These classes should make it simple for
the library (and caller software) to deal with Cassandra-style Java class type

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import socket
from uuid import UUID

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from binascii import hexlify
import calendar
import datetime

View File

@@ -0,0 +1,14 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
from collections import deque
from functools import partial

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gevent
from gevent import select, socket
from gevent.event import Event

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
from collections import deque
from functools import partial

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import struct

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from bisect import bisect_right
from collections import defaultdict
from hashlib import md5

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import chain
import logging

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import islice, cycle, groupby, repeat
import logging
from random import randint
@@ -210,7 +224,7 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
def populate(self, cluster, hosts):
for dc, dc_hosts in groupby(hosts, lambda h: self._dc(h)):
self._dc_live_hosts[dc] = frozenset(dc_hosts)
self._dc_live_hosts[dc] = tuple(set(dc_hosts))
# position is currently only used for local hosts
local_live = self._dc_live_hosts.get(self.local_dc)
@@ -244,7 +258,7 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
pos = self._position
self._position += 1
local_live = list(self._dc_live_hosts.get(self.local_dc, ()))
local_live = self._dc_live_hosts.get(self.local_dc, ())
pos = (pos % len(local_live)) if local_live else 0
for host in islice(cycle(local_live), pos, pos + len(local_live)):
yield host
@@ -253,32 +267,36 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
if dc == self.local_dc:
continue
for host in list(current_dc_hosts)[:self.used_hosts_per_remote_dc]:
for host in current_dc_hosts[:self.used_hosts_per_remote_dc]:
yield host
def on_up(self, host):
dc = self._dc(host)
with self._hosts_lock:
current_hosts = self._dc_live_hosts.setdefault(dc, frozenset())
self._dc_live_hosts[dc] = current_hosts.union((host, ))
current_hosts = self._dc_live_hosts.setdefault(dc, ())
if host not in current_hosts:
self._dc_live_hosts[dc] = current_hosts + (host, )
def on_down(self, host):
dc = self._dc(host)
with self._hosts_lock:
current_hosts = self._dc_live_hosts.setdefault(dc, frozenset())
self._dc_live_hosts[dc] = current_hosts.difference((host, ))
current_hosts = self._dc_live_hosts.setdefault(dc, ())
if host in current_hosts:
self._dc_live_hosts[dc] = tuple(h for h in current_hosts if h != host)
def on_add(self, host):
dc = self._dc(host)
with self._hosts_lock:
current_hosts = self._dc_live_hosts.setdefault(dc, frozenset())
self._dc_live_hosts[dc] = current_hosts.union((host, ))
current_hosts = self._dc_live_hosts.setdefault(dc, ())
if host not in current_hosts:
self._dc_live_hosts[dc] = current_hosts + (host, )
def on_remove(self, host):
dc = self._dc(host)
with self._hosts_lock:
current_hosts = self._dc_live_hosts.setdefault(dc, frozenset())
self._dc_live_hosts[dc] = current_hosts.difference((host, ))
current_hosts = self._dc_live_hosts.setdefault(dc, ())
if host in current_hosts:
self._dc_live_hosts[dc] = tuple(h for h in current_hosts if h != host)
class TokenAwarePolicy(LoadBalancingPolicy):

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Connection pooling and host management.
"""
@@ -250,7 +264,7 @@ class _HostReconnectionHandler(_ReconnectionHandler):
if isinstance(exc, AuthenticationFailed):
return False
else:
log.warn("Error attempting to reconnect to %s, scheduling retry in %f seconds: %s",
log.warn("Error attempting to reconnect to %s, scheduling retry in %s seconds: %s",
self.host, next_delay, exc)
log.debug("Reconnection error details", exc_info=True)
return True

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module holds classes for working with prepared statements and
specifying consistency levels and retry policies for individual
@@ -308,12 +322,49 @@ class BoundStatement(Statement):
def bind(self, values):
"""
Binds a sequence of values for the prepared statement parameters
and returns this instance. Note that `values` *must* be a
sequence, even if you are only binding one value.
and returns this instance. Note that `values` *must* be:
* a sequence, even if you are only binding one value, or
* a dict that relates 1-to-1 between dict keys and columns
"""
if values is None:
values = ()
col_meta = self.prepared_statement.column_metadata
# special case for binding dicts
if isinstance(values, dict):
dict_values = values
values = []
# sort values accordingly
for col in col_meta:
try:
values.append(dict_values[col[2]])
except KeyError:
raise KeyError(
'Column name `%s` not found in bound dict.' %
(col[2]))
# ensure a 1-to-1 dict keys to columns relationship
if len(dict_values) != len(col_meta):
# find expected columns
columns = set()
for col in col_meta:
columns.add(col[2])
# generate error message
if len(dict_values) > len(col_meta):
difference = set(dict_values.keys()).difference(columns)
msg = "Too many arguments provided to bind() (got %d, expected %d). " + \
"Unexpected keys %s."
else:
difference = set(columns).difference(dict_values.keys())
msg = "Too few arguments provided to bind() (got %d, expected %d). " + \
"Expected keys %s."
# exit with error message
msg = msg % (len(values), len(col_meta), difference)
raise ValueError(msg)
if len(values) > len(col_meta):
raise ValueError(
"Too many arguments provided to bind() (got %d, expected %d)" %
@@ -567,7 +618,8 @@ class QueryTrace(object):
while True:
time_spent = time.time() - start
if max_wait is not None and time_spent >= max_wait:
raise TraceUnavailable("Trace information was not available within %f seconds" % (max_wait,))
raise TraceUnavailable(
"Trace information was not available within %f seconds. Consider raising Session.max_trace_wait." % (max_wait,))
log.debug("Attempting to fetch trace info for trace ID: %s", self.trace_id)
session_results = self._execute(

5
debian/changelog vendored Normal file
View File

@@ -0,0 +1,5 @@
python-cassandra-driver (1.1.0~prerelease-1) unstable; urgency=low
* Initial packaging
-- paul cannon <pik@debian.org> Thu, 03 Apr 2014 10:30:11 -0600

1
debian/compat vendored Normal file
View File

@@ -0,0 +1 @@
9

46
debian/control vendored Normal file
View File

@@ -0,0 +1,46 @@
Source: python-cassandra-driver
Maintainer: paul cannon <pik@debian.org>
Section: python
Priority: optional
Build-Depends: python-all-dev (>= 2.6.6-3), python-all-dbg, debhelper (>= 9),
python-sphinx (>= 1.0.7+dfsg) | python3-sphinx, libev-dev,
python-concurrent.futures | python-futures, python-setuptools,
python-nose, python-mock, python-yaml, python-gevent,
python-blist, python-tz
X-Python-Version: >= 2.7
Standards-Version: 3.9.4
Package: python-cassandra-driver
Architecture: any
Depends: ${misc:Depends}, ${python:Depends}, ${shlibs:Depends}, python-blist,
python-concurrent.futures | python-futures
Provides: ${python:Provides}
Recommends: python-scales
Suggests: python-cassandra-driver-doc
Description: Python driver for Apache Cassandra
This driver works exclusively with the Cassandra Query Language v3 (CQL3)
and Cassandra's native protocol. As such, only Cassandra 1.2+ is supported.
Package: python-cassandra-driver-dbg
Architecture: any
Depends: ${misc:Depends}, ${python:Depends}, ${shlibs:Depends},
python-cassandra-driver (= ${binary:Version})
Provides: ${python:Provides}
Section: debug
Priority: extra
Description: Python driver for Apache Cassandra (debug build and symbols)
This driver works exclusively with the Cassandra Query Language v3 (CQL3)
and Cassandra's native protocol. As such, only Cassandra 1.2+ is supported.
.
This package contains debug builds of the extensions and debug symbols for
the extensions in the main package.
Package: python-cassandra-driver-doc
Architecture: all
Section: doc
Priority: extra
Depends: ${misc:Depends}, ${sphinxdoc:Depends}
Suggests: python-cassandra-driver
Description: Python driver for Apache Cassandra (documentation)
This contains HTML documentation for the use of the Python Cassandra
driver.

28
debian/copyright vendored Normal file
View File

@@ -0,0 +1,28 @@
Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: python-driver
Upstream-Contact: Tyler Hobbs <tyler@datastax.com>
Source: https://github.com/datastax/python-driver
Files: *
Copyright: Copyright 2013, DataStax
License: Apache-2.0
Files: debian/*
Copyright: Copyright (c) 2014 by Space Monkey, Inc.
License: Apache-2.0
License: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
.
http://www.apache.org/licenses/LICENSE-2.0
.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
.
On Debian systems, the full text of the Apache License version 2.0
can be found in the file `/usr/share/common-licenses/Apache-2.0'.

View File

@@ -0,0 +1,40 @@
From: paul cannon <paul@spacemonkey.com>
Date: Thu, 3 Apr 2014 11:27:09 -0600
Subject: don't use ez_setup
Debian packages aren't supposed to download stuff while building, and
since the version of setuptools in stable is less than the one ez_setup
wants, and since some system python packages don't ship their .egg-info
directories, it might try.
It's ok though, we can rely on the Depends and Build-Depends for making
sure python-setuptools and the various other deps are around at the right
times.
---
setup.py | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git a/setup.py b/setup.py
index 0c28d3d..c0fd6c1 100644
--- a/setup.py
+++ b/setup.py
@@ -1,8 +1,5 @@
import sys
-import ez_setup
-ez_setup.use_setuptools()
-
if __name__ == '__main__' and sys.argv[1] == "gevent_nosetests":
from gevent.monkey import patch_all
patch_all()
@@ -174,8 +171,8 @@ def run_setup(extensions):
author_email='tyler@datastax.com',
packages=['cassandra', 'cassandra.io'],
include_package_data=True,
- install_requires=dependencies,
- tests_require=['nose', 'mock', 'ccm', 'unittest2', 'PyYAML', 'pytz'],
+ install_requires=(),
+ tests_require=(),
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',

1
debian/patches/series vendored Normal file
View File

@@ -0,0 +1 @@
0001-don-t-use-ez_setup.patch

View File

@@ -0,0 +1,2 @@
usr/lib/python2*/*-packages/cassandra/*_d.so
usr/lib/python2*/*-packages/cassandra/io/*_d.so

View File

@@ -0,0 +1 @@
docs/_build/*/*

View File

@@ -0,0 +1,4 @@
usr/lib/python2*/*-packages/cassandra/*[!_][!_].so
usr/lib/python2*/*-packages/cassandra/*.py
usr/lib/python2*/*-packages/cassandra/io/*[!_][!_].so
usr/lib/python2*/*-packages/cassandra/io/*.py

16
debian/rules vendored Executable file
View File

@@ -0,0 +1,16 @@
#!/usr/bin/make -f
%:
dh $@ --with python2,sphinxdoc
override_dh_auto_build:
dh_auto_build
python setup.py doc
ifeq (,$(filter nocheck,$(DEB_BUILD_OPTIONS)))
override_dh_auto_test:
python setup.py gevent_nosetests
endif
override_dh_strip:
dh_strip --dbg-package=python-cassandra-driver-dbg

1
debian/source/format vendored Normal file
View File

@@ -0,0 +1 @@
3.0 (quilt)

View File

@@ -0,0 +1,8 @@
``cassandra.concurrent`` - Utilities for Concurrent Statement Execution
=======================================================================
.. module:: cassandra.concurrent
.. autofunction:: execute_concurrent
.. autofunction:: execute_concurrent_with_args

View File

@@ -10,6 +10,7 @@ API Documentation
cassandra/metadata
cassandra/query
cassandra/pool
cassandra/concurrent
cassandra/connection
cassandra/io/asyncorereactor
cassandra/io/libevreactor

16
example.py Executable file → Normal file
View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env python
import logging
@@ -56,7 +70,7 @@ def main():
for i in range(10):
log.info("inserting row %d" % i)
session.execute(query, dict(key="key%d" % i, a='a', b='b'))
session.execute(prepared.bind(("key%d" % i, 'b', 'b')))
session.execute(prepared, ("key%d" % i, 'b', 'b'))
future = session.execute_async("SELECT * FROM mytable")
log.info("key\tcol1\tcol2")

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import ez_setup

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
log = logging.getLogger()

View File

@@ -1,3 +1,20 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import traceback
try:
import unittest2 as unittest
except ImportError:
@@ -17,6 +34,7 @@ except ImportError as e:
raise unittest.SkipTest('ccm is a dependency for integration tests:', e)
CLUSTER_NAME = 'test_cluster'
MULTIDC_CLUSTER_NAME = 'multidc_test_cluster'
CCM_CLUSTER = None
CASSANDRA_VERSION = os.getenv('CASSANDRA_VERSION', '2.0.6')
@@ -99,7 +117,43 @@ def setup_package():
setup_test_keyspace()
def use_multidc(dc_list):
teardown_package()
try:
try:
cluster = CCMCluster.load(path, MULTIDC_CLUSTER_NAME)
log.debug("Found existing ccm test multi-dc cluster, clearing")
cluster.clear()
except Exception:
log.debug("Creating new ccm test multi-dc cluster")
cluster = CCMCluster(path, MULTIDC_CLUSTER_NAME, cassandra_version=CASSANDRA_VERSION)
cluster.set_configuration_options({'start_native_transport': True})
common.switch_cluster(path, MULTIDC_CLUSTER_NAME)
cluster.populate(dc_list)
log.debug("Starting ccm test cluster")
cluster.start(wait_for_binary_proto=True)
except Exception:
log.exception("Failed to start ccm cluster:")
raise
global CCM_CLUSTER
CCM_CLUSTER = cluster
setup_test_keyspace()
log.debug("Switched to multidc cluster")
def use_singledc():
teardown_package()
setup_package()
log.debug("Switched to singledc cluster")
def setup_test_keyspace():
# wait for nodes to startup
time.sleep(10)
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
@@ -130,16 +184,27 @@ def setup_test_keyspace():
k int PRIMARY KEY,
v int )'''
session.execute(ddl)
except Exception:
traceback.print_exc()
raise
finally:
cluster.shutdown()
def teardown_package():
if CCM_CLUSTER:
for cluster_name in [CLUSTER_NAME, MULTIDC_CLUSTER_NAME]:
try:
CCM_CLUSTER.clear()
cluster = CCMCluster.load(path, cluster_name)
try:
cluster.clear()
cluster.remove()
log.info('Cleared cluster: %s' % cluster_name)
except Exception:
log.exception('Failed to clear cluster: %s' % cluster_name)
except Exception:
log.exception("Failed to clear cluster")
log.warn('Did not find cluster: %s' % cluster_name)
class UpDownWaiter(object):

View File

@@ -0,0 +1,14 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import struct
import traceback

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import Queue
from struct import pack
import unittest
@@ -21,6 +35,9 @@ def create_column_name(i):
if not i:
break
if column_name == 'if':
column_name = 'special_case'
return column_name
@@ -143,17 +160,18 @@ class LargeDataTests(unittest.TestCase):
def test_wide_table(self):
table = 'wide_table'
table_width = 330
session = self.make_session_and_keyspace()
table_declaration = 'CREATE TABLE %s (key INT PRIMARY KEY, '
table_declaration += ' INT, '.join(create_column_name(i) for i in range(330))
table_declaration += ' INT, '.join(create_column_name(i) for i in range(table_width))
table_declaration += ' INT)'
session.execute(table_declaration % table)
# Write
insert_statement = 'INSERT INTO %s (key, '
insert_statement += ', '.join(create_column_name(i) for i in range(330))
insert_statement += ', '.join(create_column_name(i) for i in range(table_width))
insert_statement += ') VALUES (%s, '
insert_statement += ', '.join(str(i) for i in range(330))
insert_statement += ', '.join(str(i) for i in range(table_width))
insert_statement += ')'
insert_statement = insert_statement % (table, 0)
@@ -164,5 +182,5 @@ class LargeDataTests(unittest.TestCase):
# Verify
for row in result:
for i in range(330):
for i in range(table_width):
self.assertEqual(row[create_column_name(i)], i)

View File

@@ -0,0 +1,465 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import struct
from cassandra import ConsistencyLevel, Unavailable
from cassandra.cluster import Cluster, NoHostAvailable
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.policies import (RoundRobinPolicy, DCAwareRoundRobinPolicy,
TokenAwarePolicy, WhiteListRoundRobinPolicy)
from cassandra.query import SimpleStatement
from tests.integration import use_multidc, use_singledc, PROTOCOL_VERSION
from tests.integration.long.utils import (wait_for_up, create_schema,
CoordinatorStats, force_stop,
wait_for_down, decommission, start,
bootstrap, stop, IP_FORMAT)
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
class LoadBalancingPolicyTests(unittest.TestCase):
def setUp(self):
self.coordinator_stats = CoordinatorStats()
self.prepared = None
@classmethod
def tearDownClass(cls):
use_singledc()
def _insert(self, session, keyspace, count=12,
consistency_level=ConsistencyLevel.ONE):
session.execute('USE %s' % keyspace)
ss = SimpleStatement('INSERT INTO cf(k, i) VALUES (0, 0)',
consistency_level=consistency_level)
execute_concurrent_with_args(session, ss, [None] * count)
def _query(self, session, keyspace, count=12,
consistency_level=ConsistencyLevel.ONE, use_prepared=False):
if use_prepared:
query_string = 'SELECT * FROM %s.cf WHERE k = ?' % keyspace
if not self.prepared or self.prepared.query_string != query_string:
self.prepared = session.prepare(query_string)
for i in range(count):
self.coordinator_stats.add_coordinator(session.execute_async(self.prepared.bind((0,))))
else:
routing_key = struct.pack('>i', 0)
for i in range(count):
ss = SimpleStatement('SELECT * FROM %s.cf WHERE k = 0' % keyspace,
consistency_level=consistency_level,
routing_key=routing_key)
self.coordinator_stats.add_coordinator(session.execute_async(ss))
def test_roundrobin(self):
use_singledc()
keyspace = 'test_roundrobin'
cluster = Cluster(
load_balancing_policy=RoundRobinPolicy(),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3)
create_schema(session, keyspace, replication_factor=3)
self._insert(session, keyspace)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 4)
self.coordinator_stats.assert_query_count_equals(self, 2, 4)
self.coordinator_stats.assert_query_count_equals(self, 3, 4)
force_stop(3)
wait_for_down(cluster, 3)
self.coordinator_stats.reset_counts()
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 6)
self.coordinator_stats.assert_query_count_equals(self, 2, 6)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
decommission(1)
start(3)
wait_for_down(cluster, 1)
wait_for_up(cluster, 3)
self.coordinator_stats.reset_counts()
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 6)
self.coordinator_stats.assert_query_count_equals(self, 3, 6)
def test_roundrobin_two_dcs(self):
use_multidc([2, 2])
keyspace = 'test_roundrobin_two_dcs'
cluster = Cluster(
load_balancing_policy=RoundRobinPolicy(),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3, wait=False)
wait_for_up(cluster, 4)
create_schema(session, keyspace, replication_strategy=[2, 2])
self._insert(session, keyspace)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 3)
self.coordinator_stats.assert_query_count_equals(self, 2, 3)
self.coordinator_stats.assert_query_count_equals(self, 3, 3)
self.coordinator_stats.assert_query_count_equals(self, 4, 3)
force_stop(1)
bootstrap(5, 'dc3')
# reset control connection
self._insert(session, keyspace, count=1000)
wait_for_up(cluster, 5)
self.coordinator_stats.reset_counts()
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 3)
self.coordinator_stats.assert_query_count_equals(self, 3, 3)
self.coordinator_stats.assert_query_count_equals(self, 4, 3)
self.coordinator_stats.assert_query_count_equals(self, 5, 3)
def test_roundrobin_two_dcs_2(self):
use_multidc([2, 2])
keyspace = 'test_roundrobin_two_dcs_2'
cluster = Cluster(
load_balancing_policy=RoundRobinPolicy(),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3, wait=False)
wait_for_up(cluster, 4)
create_schema(session, keyspace, replication_strategy=[2, 2])
self._insert(session, keyspace)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 3)
self.coordinator_stats.assert_query_count_equals(self, 2, 3)
self.coordinator_stats.assert_query_count_equals(self, 3, 3)
self.coordinator_stats.assert_query_count_equals(self, 4, 3)
force_stop(1)
bootstrap(5, 'dc1')
# reset control connection
self._insert(session, keyspace, count=1000)
wait_for_up(cluster, 5)
self.coordinator_stats.reset_counts()
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 3)
self.coordinator_stats.assert_query_count_equals(self, 3, 3)
self.coordinator_stats.assert_query_count_equals(self, 4, 3)
self.coordinator_stats.assert_query_count_equals(self, 5, 3)
def test_dc_aware_roundrobin_two_dcs(self):
use_multidc([3, 2])
keyspace = 'test_dc_aware_roundrobin_two_dcs'
cluster = Cluster(
load_balancing_policy=DCAwareRoundRobinPolicy('dc1'),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3, wait=False)
wait_for_up(cluster, 4, wait=False)
wait_for_up(cluster, 5)
create_schema(session, keyspace, replication_strategy=[2, 2])
self._insert(session, keyspace)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 4)
self.coordinator_stats.assert_query_count_equals(self, 2, 4)
self.coordinator_stats.assert_query_count_equals(self, 3, 4)
self.coordinator_stats.assert_query_count_equals(self, 4, 0)
self.coordinator_stats.assert_query_count_equals(self, 5, 0)
def test_dc_aware_roundrobin_two_dcs_2(self):
use_multidc([3, 2])
keyspace = 'test_dc_aware_roundrobin_two_dcs_2'
cluster = Cluster(
load_balancing_policy=DCAwareRoundRobinPolicy('dc2'),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3, wait=False)
wait_for_up(cluster, 4, wait=False)
wait_for_up(cluster, 5)
create_schema(session, keyspace, replication_strategy=[2, 2])
self._insert(session, keyspace)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.assert_query_count_equals(self, 4, 6)
self.coordinator_stats.assert_query_count_equals(self, 5, 6)
def test_dc_aware_roundrobin_one_remote_host(self):
use_multidc([2, 2])
keyspace = 'test_dc_aware_roundrobin_one_remote_host'
cluster = Cluster(
load_balancing_policy=DCAwareRoundRobinPolicy('dc2', used_hosts_per_remote_dc=1),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3, wait=False)
wait_for_up(cluster, 4)
create_schema(session, keyspace, replication_strategy=[2, 2])
self._insert(session, keyspace)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
self.coordinator_stats.assert_query_count_equals(self, 3, 6)
self.coordinator_stats.assert_query_count_equals(self, 4, 6)
self.coordinator_stats.reset_counts()
bootstrap(5, 'dc1')
wait_for_up(cluster, 5)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
self.coordinator_stats.assert_query_count_equals(self, 3, 6)
self.coordinator_stats.assert_query_count_equals(self, 4, 6)
self.coordinator_stats.assert_query_count_equals(self, 5, 0)
self.coordinator_stats.reset_counts()
decommission(3)
decommission(4)
wait_for_down(cluster, 3, wait=True)
wait_for_down(cluster, 4, wait=True)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.assert_query_count_equals(self, 4, 0)
responses = set()
for node in [1, 2, 5]:
responses.add(self.coordinator_stats.get_query_count(node))
self.assertEqual(set([0, 0, 12]), responses)
self.coordinator_stats.reset_counts()
decommission(5)
wait_for_down(cluster, 5, wait=True)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.assert_query_count_equals(self, 4, 0)
self.coordinator_stats.assert_query_count_equals(self, 5, 0)
responses = set()
for node in [1, 2]:
responses.add(self.coordinator_stats.get_query_count(node))
self.assertEqual(set([0, 12]), responses)
self.coordinator_stats.reset_counts()
decommission(1)
wait_for_down(cluster, 1, wait=True)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 12)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.assert_query_count_equals(self, 4, 0)
self.coordinator_stats.assert_query_count_equals(self, 5, 0)
self.coordinator_stats.reset_counts()
force_stop(2)
try:
self._query(session, keyspace)
self.fail()
except NoHostAvailable:
pass
def test_token_aware(self):
keyspace = 'test_token_aware'
self.token_aware(keyspace)
def test_token_aware_prepared(self):
keyspace = 'test_token_aware_prepared'
self.token_aware(keyspace, True)
def token_aware(self, keyspace, use_prepared=False):
use_singledc()
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3)
create_schema(session, keyspace, replication_factor=1)
self._insert(session, keyspace)
self._query(session, keyspace, use_prepared=use_prepared)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 12)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.reset_counts()
self._query(session, keyspace, use_prepared=use_prepared)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 12)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.reset_counts()
force_stop(2)
wait_for_down(cluster, 2, wait=True)
try:
self._query(session, keyspace, use_prepared=use_prepared)
self.fail()
except Unavailable as e:
self.assertEqual(e.consistency, 1)
self.assertEqual(e.required_replicas, 1)
self.assertEqual(e.alive_replicas, 0)
self.coordinator_stats.reset_counts()
start(2)
wait_for_up(cluster, 2, wait=True)
self._query(session, keyspace, use_prepared=use_prepared)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 12)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.reset_counts()
decommission(2)
wait_for_down(cluster, 2, wait=True)
self._query(session, keyspace, use_prepared=use_prepared)
self.coordinator_stats.assert_query_count_equals(self, 1, 6)
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
self.coordinator_stats.assert_query_count_equals(self, 3, 6)
def test_token_aware_composite_key(self):
use_singledc()
keyspace = 'test_token_aware_composite_key'
table = 'composite'
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3)
create_schema(session, keyspace, replication_factor=2)
session.execute('CREATE TABLE %s ('
'k1 int, '
'k2 int, '
'i int, '
'PRIMARY KEY ((k1, k2)))' % table)
prepared = session.prepare('INSERT INTO %s '
'(k1, k2, i) '
'VALUES '
'(?, ?, ?)' % table)
session.execute(prepared.bind((1, 2, 3)))
results = session.execute('SELECT * FROM %s WHERE k1 = 1 AND k2 = 2' % table)
self.assertTrue(len(results) == 1)
self.assertTrue(results[0].i)
def test_token_aware_with_rf_2(self, use_prepared=False):
use_singledc()
keyspace = 'test_token_aware_with_rf_2'
cluster = Cluster(
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3)
create_schema(session, keyspace, replication_factor=2)
self._insert(session, keyspace)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 12)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.reset_counts()
stop(2)
wait_for_down(cluster, 2, wait=True)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 0)
self.coordinator_stats.assert_query_count_equals(self, 3, 12)
def test_white_list(self):
use_singledc()
keyspace = 'test_white_list'
cluster = Cluster(('127.0.0.2',),
load_balancing_policy=WhiteListRoundRobinPolicy((IP_FORMAT % 2,)),
protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
wait_for_up(cluster, 1, wait=False)
wait_for_up(cluster, 2, wait=False)
wait_for_up(cluster, 3)
create_schema(session, keyspace)
self._insert(session, keyspace)
self._query(session, keyspace)
self.coordinator_stats.assert_query_count_equals(self, 1, 0)
self.coordinator_stats.assert_query_count_equals(self, 2, 12)
self.coordinator_stats.assert_query_count_equals(self, 3, 0)
self.coordinator_stats.reset_counts()
decommission(2)
wait_for_down(cluster, 2, wait=True)
try:
self._query(session, keyspace)
self.fail()
except NoHostAvailable:
pass

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from cassandra import ConsistencyLevel

View File

@@ -1,12 +1,28 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from collections import defaultdict
from ccmlib.node import Node
from cassandra.query import named_tuple_factory
from tests.integration import get_node
from tests.integration import get_node, get_cluster
IP_FORMAT = '127.0.0.%s'
log = logging.getLogger(__name__)
@@ -17,19 +33,23 @@ class CoordinatorStats():
self.coordinator_counts = defaultdict(int)
def add_coordinator(self, future):
future.result()
coordinator = future._current_host.address
self.coordinator_counts[coordinator] += 1
if future._errors:
log.error('future._errors: %s', future._errors)
future.result()
def reset_counts(self):
self.coordinator_counts = defaultdict(int)
def get_query_count(self, node):
ip = '127.0.0.%d' % node
return self.coordinator_counts[ip]
def assert_query_count_equals(self, testcase, node, expected):
ip = '127.0.0.%d' % node
if self.coordinator_counts[ip] != expected:
if self.get_query_count(node) != expected:
testcase.fail('Expected %d queries to %s, but got %d. Query counts: %s' % (
expected, ip, self.coordinator_counts[ip], dict(self.coordinator_counts)))
@@ -78,6 +98,32 @@ def force_stop(node):
log.debug("Node %s was stopped", node)
def decommission(node):
get_node(node).decommission()
get_node(node).stop()
def bootstrap(node, data_center=None, token=None):
node_instance = Node('node%s' % node,
get_cluster(),
auto_bootstrap=False,
thrift_interface=(IP_FORMAT % node, 9160),
storage_interface=(IP_FORMAT % node, 7000),
jmx_port=str(7000 + 100 * node),
remote_debug_port=0,
initial_token=token if token else node * 10)
get_cluster().add(node_instance, is_seed=False, data_center=data_center)
try:
start(node)
except:
# Try only twice
try:
start(node)
except:
log.error('Added node failed to start twice.')
def ring(node):
print 'From node%s:' % node
get_node(node).nodetool('ring')
@@ -85,24 +131,26 @@ def ring(node):
def wait_for_up(cluster, node, wait=True):
while True:
host = cluster.metadata.get_host('127.0.0.%s' % node)
host = cluster.metadata.get_host(IP_FORMAT % node)
time.sleep(0.1)
if host and host.is_up:
# BUG: shouldn't have to, but we do
if wait:
time.sleep(5)
log.debug("Sleeping 30s until host is up")
time.sleep(30)
log.debug("Done waiting for node %s to be up", node)
return
def wait_for_down(cluster, node, wait=True):
log.debug("Waiting for node %s to be down", node)
while True:
host = cluster.metadata.get_host('127.0.0.%s' % node)
host = cluster.metadata.get_host(IP_FORMAT % node)
time.sleep(0.1)
if not host or not host.is_up:
# BUG: shouldn't have to, but we do
if wait:
log.debug("Sleeping 5s until host is up")
time.sleep(5)
log.debug("Sleeping 10s until host is down")
time.sleep(10)
log.debug("Done waiting for node %s to be down", node)
return

View File

@@ -0,0 +1,14 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests.integration import PROTOCOL_VERSION
try:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests.integration import PROTOCOL_VERSION
try:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests.integration import PROTOCOL_VERSION
try:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests.integration import PROTOCOL_VERSION
try:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
@@ -402,7 +416,7 @@ class TestCodeCoverage(unittest.TestCase):
get_replicas = cluster.metadata.token_map.get_replicas
for ksname in ('test1rf', 'test2rf', 'test3rf'):
self.assertNotEqual(list(get_replicas('test3rf', ring[0])), [])
self.assertNotEqual(list(get_replicas(ksname, ring[0])), [])
for i, token in enumerate(ring):
self.assertEqual(set(get_replicas('test3rf', token)), set(owners))

View File

@@ -1,3 +1,19 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
try:
import unittest2 as unittest
except ImportError:
@@ -122,6 +138,7 @@ class MetricsTests(unittest.TestCase):
# Force kill ccm node
get_node(1).stop(wait=True, gently=True)
time.sleep(5)
try:
# Test write

View File

@@ -1,3 +1,18 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests.integration import PROTOCOL_VERSION
try:
@@ -9,6 +24,7 @@ from cassandra import InvalidRequest
from cassandra.cluster import Cluster
from cassandra.query import PreparedStatement
class PreparedStatementTests(unittest.TestCase):
def test_basic(self):
@@ -55,6 +71,32 @@ class PreparedStatementTests(unittest.TestCase):
results = session.execute(bound)
self.assertEquals(results, [('a', 'b', 'c')])
# test with new dict binding
prepared = session.prepare(
"""
INSERT INTO cf0 (a, b, c) VALUES (?, ?, ?)
""")
self.assertIsInstance(prepared, PreparedStatement)
bound = prepared.bind({
'a': 'x',
'b': 'y',
'c': 'z'
})
session.execute(bound)
prepared = session.prepare(
"""
SELECT * FROM cf0 WHERE a=?
""")
self.assertIsInstance(prepared, PreparedStatement)
bound = prepared.bind({'a': 'x'})
results = session.execute(bound)
self.assertEquals(results, [('x', 'y', 'z')])
def test_missing_primary_key(self):
"""
Ensure an InvalidRequest is thrown
@@ -73,6 +115,25 @@ class PreparedStatementTests(unittest.TestCase):
bound = prepared.bind((1,))
self.assertRaises(InvalidRequest, session.execute, bound)
def test_missing_primary_key_dicts(self):
"""
Ensure an InvalidRequest is thrown
when prepared statements are missing the primary key
with dict bindings
"""
cluster = Cluster()
session = cluster.connect()
prepared = session.prepare(
"""
INSERT INTO test3rf.test (v) VALUES (?)
""")
self.assertIsInstance(prepared, PreparedStatement)
bound = prepared.bind({'v': 1})
self.assertRaises(InvalidRequest, session.execute, bound)
def test_too_many_bind_values(self):
"""
Ensure a ValueError is thrown when attempting to bind too many variables
@@ -89,6 +150,27 @@ class PreparedStatementTests(unittest.TestCase):
self.assertIsInstance(prepared, PreparedStatement)
self.assertRaises(ValueError, prepared.bind, (1,2))
def test_too_many_bind_values_dicts(self):
"""
Ensure a ValueError is thrown when attempting to bind too many variables
with dict bindings
"""
cluster = Cluster()
session = cluster.connect()
prepared = session.prepare(
"""
INSERT INTO test3rf.test (v) VALUES (?)
""")
self.assertIsInstance(prepared, PreparedStatement)
self.assertRaises(ValueError, prepared.bind, {'k': 1, 'v': 2})
# also catch too few variables with dicts
self.assertIsInstance(prepared, PreparedStatement)
self.assertRaises(KeyError, prepared.bind, {})
def test_none_values(self):
"""
Ensure binding None is handled correctly
@@ -116,6 +198,35 @@ class PreparedStatementTests(unittest.TestCase):
results = session.execute(bound)
self.assertEquals(results[0].v, None)
def test_none_values_dicts(self):
"""
Ensure binding None is handled correctly with dict bindings
"""
cluster = Cluster()
session = cluster.connect()
# test with new dict binding
prepared = session.prepare(
"""
INSERT INTO test3rf.test (k, v) VALUES (?, ?)
""")
self.assertIsInstance(prepared, PreparedStatement)
bound = prepared.bind({'k': 1, 'v': None})
session.execute(bound)
prepared = session.prepare(
"""
SELECT * FROM test3rf.test WHERE k=?
""")
self.assertIsInstance(prepared, PreparedStatement)
bound = prepared.bind({'k': 1})
results = session.execute(bound)
self.assertEquals(results[0].v, None)
def test_async_binding(self):
"""
Ensure None binding over async queries
@@ -142,3 +253,31 @@ class PreparedStatementTests(unittest.TestCase):
future = session.execute_async(prepared, (873,))
results = future.result()
self.assertEquals(results[0].v, None)
def test_async_binding_dicts(self):
"""
Ensure None binding over async queries with dict bindings
"""
cluster = Cluster()
session = cluster.connect()
prepared = session.prepare(
"""
INSERT INTO test3rf.test (k, v) VALUES (?, ?)
""")
self.assertIsInstance(prepared, PreparedStatement)
future = session.execute_async(prepared, {'k': 873, 'v': None})
future.result()
prepared = session.prepare(
"""
SELECT * FROM test3rf.test WHERE k=?
""")
self.assertIsInstance(prepared, PreparedStatement)
future = session.execute_async(prepared, {'k': 873})
results = future.result()
self.assertEquals(results[0].v, None)

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests.integration import PROTOCOL_VERSION
import logging

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -0,0 +1,14 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,14 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
@@ -90,13 +104,12 @@ class MockConnection(object):
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"]],
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"]]]
]
def wait_for_responses(self, peer_query, local_query, timeout=None):
local_response = ResultMessage(
kind=RESULT_KIND_ROWS, results=self.local_results)
peer_response = ResultMessage(
kind=RESULT_KIND_ROWS, results=self.peer_results)
return (peer_response, local_response)
self.wait_for_responses = Mock(return_value=(peer_response, local_response))
class FakeTime(object):
@@ -122,6 +135,38 @@ class ControlConnectionTest(unittest.TestCase):
self.control_connection._connection = self.connection
self.control_connection._time = self.time
def _get_matching_schema_preloaded_results(self):
local_results = [
["schema_version", "cluster_name", "data_center", "rack", "partitioner", "tokens"],
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", ["0", "100", "200"]]]
]
local_response = ResultMessage(kind=RESULT_KIND_ROWS, results=local_results)
peer_results = [
["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens"],
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"]],
["192.168.1.2", "10.0.0.2", "a", "dc1", "rack1", ["2", "102", "202"]]]
]
peer_response = ResultMessage(kind=RESULT_KIND_ROWS, results=peer_results)
return (peer_response, local_response)
def _get_nonmatching_schema_preloaded_results(self):
local_results = [
["schema_version", "cluster_name", "data_center", "rack", "partitioner", "tokens"],
[["a", "foocluster", "dc1", "rack1", "Murmur3Partitioner", ["0", "100", "200"]]]
]
local_response = ResultMessage(kind=RESULT_KIND_ROWS, results=local_results)
peer_results = [
["rpc_address", "peer", "schema_version", "data_center", "rack", "tokens"],
[["192.168.1.1", "10.0.0.1", "a", "dc1", "rack1", ["1", "101", "201"]],
["192.168.1.2", "10.0.0.2", "b", "dc1", "rack1", ["2", "102", "202"]]]
]
peer_response = ResultMessage(kind=RESULT_KIND_ROWS, results=peer_results)
return (peer_response, local_response)
def test_wait_for_schema_agreement(self):
"""
Basic test with all schema versions agreeing
@@ -130,6 +175,29 @@ class ControlConnectionTest(unittest.TestCase):
# the control connection should not have slept at all
self.assertEqual(self.time.clock, 0)
def test_wait_for_schema_agreement_uses_preloaded_results_if_given(self):
"""
wait_for_schema_agreement uses preloaded results if given for shared table queries
"""
preloaded_results = self._get_matching_schema_preloaded_results()
self.assertTrue(self.control_connection.wait_for_schema_agreement(preloaded_results=preloaded_results))
# the control connection should not have slept at all
self.assertEqual(self.time.clock, 0)
# the connection should not have made any queries if given preloaded results
self.assertEqual(self.connection.wait_for_responses.call_count, 0)
def test_wait_for_schema_agreement_falls_back_to_querying_if_schemas_dont_match_preloaded_result(self):
"""
wait_for_schema_agreement requery if schema does not match using preloaded results
"""
preloaded_results = self._get_nonmatching_schema_preloaded_results()
self.assertTrue(self.control_connection.wait_for_schema_agreement(preloaded_results=preloaded_results))
# the control connection should not have slept at all
self.assertEqual(self.time.clock, 0)
self.assertEqual(self.connection.wait_for_responses.call_count, 1)
def test_wait_for_schema_agreement_fails(self):
"""
Make sure the control connection sleeps and retries
@@ -197,6 +265,32 @@ class ControlConnectionTest(unittest.TestCase):
self.assertEqual(host.datacenter, "dc1")
self.assertEqual(host.rack, "rack1")
self.assertEqual(self.connection.wait_for_responses.call_count, 1)
def test_refresh_nodes_and_tokens_uses_preloaded_results_if_given(self):
"""
refresh_nodes_and_tokens uses preloaded results if given for shared table queries
"""
preloaded_results = self._get_matching_schema_preloaded_results()
self.control_connection._refresh_node_list_and_token_map(self.connection, preloaded_results=preloaded_results)
meta = self.cluster.metadata
self.assertEqual(meta.partitioner, 'Murmur3Partitioner')
self.assertEqual(meta.cluster_name, 'foocluster')
# check token map
self.assertEqual(sorted(meta.all_hosts()), sorted(meta.token_map.keys()))
for token_list in meta.token_map.values():
self.assertEqual(3, len(token_list))
# check datacenter/rack
for host in meta.all_hosts():
self.assertEqual(host.datacenter, "dc1")
self.assertEqual(host.rack, "rack1")
# the connection should not have made any queries if given preloaded results
self.assertEqual(self.connection.wait_for_responses.call_count, 0)
def test_refresh_nodes_and_tokens_no_partitioner(self):
"""
Test handling of an unknown partitioner.

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:

View File

@@ -1,3 +1,17 @@
# Copyright 2013-2014 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError: