diff --git a/cassandra/cluster.py b/cassandra/cluster.py index d37433cf..07222d84 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -62,7 +62,7 @@ from cassandra.protocol import (QueryMessage, ResultMessage, RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS, RESULT_KIND_SCHEMA_CHANGE) from cassandra.metadata import Metadata, protect_name -from cassandra.policies import (RoundRobinPolicy, SimpleConvictionPolicy, +from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy, ExponentialReconnectionPolicy, HostDistance, RetryPolicy) from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler, @@ -169,6 +169,16 @@ def _shutdown_cluster(cluster): cluster.shutdown() +# murmur3 implementation required for TokenAware is only available for CPython +import platform +if platform.python_implementation() == 'CPython': + def default_lbp_factory(): + return TokenAwarePolicy(DCAwareRoundRobinPolicy()) +else: + def default_lbp_factory(): + return DCAwareRoundRobinPolicy() + + class Cluster(object): """ The main class to use when interacting with a Cassandra cluster. @@ -193,9 +203,9 @@ class Cluster(object): Defaults to loopback interface. Note: When using :class:`.DCAwareLoadBalancingPolicy` with no explicit - local_dc set, the DC is chosen from an arbitrary host in contact_points. - In this case, contact_points should contain only nodes from a single, - local DC. + local_dc set (as is the default), the DC is chosen from an arbitrary + host in contact_points. In this case, contact_points should contain + only nodes from a single, local DC. """ port = 9042 @@ -289,7 +299,16 @@ class Cluster(object): load_balancing_policy = None """ An instance of :class:`.policies.LoadBalancingPolicy` or - one of its subclasses. Defaults to :class:`~.RoundRobinPolicy`. + one of its subclasses. + + .. versionchanged:: 2.6.0 + + Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`). + when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy` + otherwise. Default local DC will be chosen from contact points. + + **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to + DC locality and remote nodes.** """ reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0) @@ -319,6 +338,8 @@ class Cluster(object): by the :attr:`~.Cluster.load_balancing_policy` will have a connection opened to them. Otherwise, they will not have a connection opened to them. + Note that the default load balancing policy ignores remote hosts by default. + .. versionadded:: 2.1.0 """ @@ -503,7 +524,7 @@ class Cluster(object): self.load_balancing_policy = load_balancing_policy else: - self.load_balancing_policy = RoundRobinPolicy() + self.load_balancing_policy = default_lbp_factory() if reconnection_policy is not None: if isinstance(reconnection_policy, type): @@ -2114,20 +2135,21 @@ class ControlConnection(object): return None def shutdown(self): + # stop trying to reconnect (if we are) + with self._reconnection_lock: + if self._reconnection_handler: + self._reconnection_handler.cancel() + with self._lock: if self._is_shutdown: return else: self._is_shutdown = True - log.debug("Shutting down control connection") - # stop trying to reconnect (if we are) - if self._reconnection_handler: - self._reconnection_handler.cancel() - - if self._connection: - self._connection.close() - del self._connection + log.debug("Shutting down control connection") + if self._connection: + self._connection.close() + del self._connection def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None, aggregate=None, schema_agreement_wait=None): @@ -2545,17 +2567,21 @@ class ControlConnection(object): return dict((version, list(nodes)) for version, nodes in six.iteritems(versions)) def _signal_error(self): - # try just signaling the cluster, as this will trigger a reconnect - # as part of marking the host down - if self._connection and self._connection.is_defunct: - host = self._cluster.metadata.get_host(self._connection.host) - # host may be None if it's already been removed, but that indicates - # that errors have already been reported, so we're fine - if host: - self._cluster.signal_connection_failure( - host, self._connection.last_error, is_host_addition=False) + with self._lock: + if self._is_shutdown: return + # try just signaling the cluster, as this will trigger a reconnect + # as part of marking the host down + if self._connection and self._connection.is_defunct: + host = self._cluster.metadata.get_host(self._connection.host) + # host may be None if it's already been removed, but that indicates + # that errors have already been reported, so we're fine + if host: + self._cluster.signal_connection_failure( + host, self._connection.last_error, is_host_addition=False) + return + # if the connection is not defunct or the host already left, reconnect # manually self.reconnect() diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index e3f91cf1..887498e3 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -232,7 +232,7 @@ def teardown_package(): log.exception('Failed to remove cluster: %s' % cluster_name) except Exception: - log.warn('Did not find cluster: %s' % cluster_name) + log.warning('Did not find cluster: %s' % cluster_name) def setup_keyspace(ipformat=None):