Merge pull request #300 from datastax/PYTHON-160

PYTHON-160 - Token-, DC-Aware default load balancing
This commit is contained in:
Adam Holmberg
2015-05-19 13:40:52 -05:00
2 changed files with 50 additions and 24 deletions

View File

@@ -62,7 +62,7 @@ from cassandra.protocol import (QueryMessage, ResultMessage,
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
RESULT_KIND_SCHEMA_CHANGE)
from cassandra.metadata import Metadata, protect_name
from cassandra.policies import (RoundRobinPolicy, SimpleConvictionPolicy,
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance,
RetryPolicy)
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
@@ -169,6 +169,16 @@ def _shutdown_cluster(cluster):
cluster.shutdown()
# murmur3 implementation required for TokenAware is only available for CPython
import platform
if platform.python_implementation() == 'CPython':
def default_lbp_factory():
return TokenAwarePolicy(DCAwareRoundRobinPolicy())
else:
def default_lbp_factory():
return DCAwareRoundRobinPolicy()
class Cluster(object):
"""
The main class to use when interacting with a Cassandra cluster.
@@ -193,9 +203,9 @@ class Cluster(object):
Defaults to loopback interface.
Note: When using :class:`.DCAwareLoadBalancingPolicy` with no explicit
local_dc set, the DC is chosen from an arbitrary host in contact_points.
In this case, contact_points should contain only nodes from a single,
local DC.
local_dc set (as is the default), the DC is chosen from an arbitrary
host in contact_points. In this case, contact_points should contain
only nodes from a single, local DC.
"""
port = 9042
@@ -289,7 +299,16 @@ class Cluster(object):
load_balancing_policy = None
"""
An instance of :class:`.policies.LoadBalancingPolicy` or
one of its subclasses. Defaults to :class:`~.RoundRobinPolicy`.
one of its subclasses.
.. versionchanged:: 2.6.0
Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
otherwise. Default local DC will be chosen from contact points.
**Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
DC locality and remote nodes.**
"""
reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0)
@@ -319,6 +338,8 @@ class Cluster(object):
by the :attr:`~.Cluster.load_balancing_policy` will have a connection
opened to them. Otherwise, they will not have a connection opened to them.
Note that the default load balancing policy ignores remote hosts by default.
.. versionadded:: 2.1.0
"""
@@ -503,7 +524,7 @@ class Cluster(object):
self.load_balancing_policy = load_balancing_policy
else:
self.load_balancing_policy = RoundRobinPolicy()
self.load_balancing_policy = default_lbp_factory()
if reconnection_policy is not None:
if isinstance(reconnection_policy, type):
@@ -2114,20 +2135,21 @@ class ControlConnection(object):
return None
def shutdown(self):
# stop trying to reconnect (if we are)
with self._reconnection_lock:
if self._reconnection_handler:
self._reconnection_handler.cancel()
with self._lock:
if self._is_shutdown:
return
else:
self._is_shutdown = True
log.debug("Shutting down control connection")
# stop trying to reconnect (if we are)
if self._reconnection_handler:
self._reconnection_handler.cancel()
if self._connection:
self._connection.close()
del self._connection
log.debug("Shutting down control connection")
if self._connection:
self._connection.close()
del self._connection
def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None,
aggregate=None, schema_agreement_wait=None):
@@ -2545,17 +2567,21 @@ class ControlConnection(object):
return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
def _signal_error(self):
# try just signaling the cluster, as this will trigger a reconnect
# as part of marking the host down
if self._connection and self._connection.is_defunct:
host = self._cluster.metadata.get_host(self._connection.host)
# host may be None if it's already been removed, but that indicates
# that errors have already been reported, so we're fine
if host:
self._cluster.signal_connection_failure(
host, self._connection.last_error, is_host_addition=False)
with self._lock:
if self._is_shutdown:
return
# try just signaling the cluster, as this will trigger a reconnect
# as part of marking the host down
if self._connection and self._connection.is_defunct:
host = self._cluster.metadata.get_host(self._connection.host)
# host may be None if it's already been removed, but that indicates
# that errors have already been reported, so we're fine
if host:
self._cluster.signal_connection_failure(
host, self._connection.last_error, is_host_addition=False)
return
# if the connection is not defunct or the host already left, reconnect
# manually
self.reconnect()

View File

@@ -232,7 +232,7 @@ def teardown_package():
log.exception('Failed to remove cluster: %s' % cluster_name)
except Exception:
log.warn('Did not find cluster: %s' % cluster_name)
log.warning('Did not find cluster: %s' % cluster_name)
def setup_keyspace(ipformat=None):