Make load balancing policy not a factory, docs

This commit is contained in:
Tyler Hobbs
2013-07-03 12:30:34 -05:00
parent f29f06dbee
commit b71848ff04
8 changed files with 116 additions and 36 deletions

View File

@@ -165,7 +165,7 @@ class AlreadyExists(Exception):
table = None table = None
""" """
The name of the table that already exists, or, if an attempt was The name of the table that already exists, or, if an attempt was
make to create a keyspace, ``None``. make to create a keyspace, :const:`None`.
""" """
def __init__(self, keyspace=None, table=None): def __init__(self, keyspace=None, table=None):

View File

@@ -90,7 +90,7 @@ class Cluster(object):
compression = True compression = True
""" """
Whether or not compression should be enabled when possible. Defaults to Whether or not compression should be enabled when possible. Defaults to
``True`` and attempts to use snappy compression. :const:`True` and attempts to use snappy compression.
""" """
auth_provider = None auth_provider = None
@@ -99,11 +99,10 @@ class Cluster(object):
and returns a dict of credentials for that node. and returns a dict of credentials for that node.
""" """
load_balancing_policy_factory = RoundRobinPolicy load_balancing_policy = RoundRobinPolicy()
""" """
A factory function which creates instances of subclasses of An instance of :class:`.policies.LoadBalancingPolicy` or
:class:`policies.LoadBalancingPolicy`. Defaults to one of its subclasses. Defaults to :class:`~.RoundRobinPolicy`.
:class:`policies.RoundRobinPolicy`.
""" """
reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0) reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0)
@@ -168,6 +167,7 @@ class Cluster(object):
scheduler = None scheduler = None
executor = None executor = None
_is_shutdown = False _is_shutdown = False
_is_setup = False
_prepared_statements = None _prepared_statements = None
def __init__(self, def __init__(self,
@@ -175,7 +175,7 @@ class Cluster(object):
port=9042, port=9042,
compression=True, compression=True,
auth_provider=None, auth_provider=None,
load_balancing_policy_factory=None, load_balancing_policy=None,
reconnection_policy=None, reconnection_policy=None,
retry_policy_factory=None, retry_policy_factory=None,
conviction_policy_factory=None, conviction_policy_factory=None,
@@ -198,10 +198,8 @@ class Cluster(object):
raise ValueError("auth_provider must be callable") raise ValueError("auth_provider must be callable")
self.auth_provider = auth_provider self.auth_provider = auth_provider
if load_balancing_policy_factory is not None: if load_balancing_policy is not None:
if not callable(load_balancing_policy_factory): self.load_balancing_policy = load_balancing_policy
raise ValueError("load_balancing_policy_factory must be callable")
self.load_balancing_policy_factory = load_balancing_policy_factory
if reconnection_policy is not None: if reconnection_policy is not None:
self.reconnection_policy = reconnection_policy self.reconnection_policy = reconnection_policy
@@ -319,6 +317,11 @@ class Cluster(object):
if self._is_shutdown: if self._is_shutdown:
raise Exception("Cluster is already shut down") raise Exception("Cluster is already shut down")
if not self._is_setup:
self.load_balancing_policy.populate(
weakref.proxy(self), self.metadata.getAllHosts())
self._is_setup = True
if self.control_connection: if self.control_connection:
try: try:
self.control_connection.connect() self.control_connection.connect()
@@ -550,8 +553,7 @@ class Session(object):
self._lock = RLock() self._lock = RLock()
self._pools = {} self._pools = {}
self._load_balancer = cluster.load_balancing_policy_factory() self._load_balancer = cluster.load_balancing_policy
self._load_balancer.populate(weakref.proxy(cluster), hosts)
for host in hosts: for host in hosts:
self.add_host(host) self.add_host(host)
@@ -832,7 +834,7 @@ class ControlConnection(object):
# use a weak reference to allow the Cluster instance to be GC'ed (and # use a weak reference to allow the Cluster instance to be GC'ed (and
# shutdown) since implementing __del__ disables the cycle detector # shutdown) since implementing __del__ disables the cycle detector
self._cluster = weakref.proxy(cluster) self._cluster = weakref.proxy(cluster)
self._balancing_policy = cluster.load_balancing_policy_factory() self._balancing_policy = cluster.load_balancing_policy
self._balancing_policy.populate(cluster, []) self._balancing_policy.populate(cluster, [])
self._reconnection_policy = cluster.reconnection_policy self._reconnection_policy = cluster.reconnection_policy
self._connection = None self._connection = None

View File

@@ -424,7 +424,7 @@ class TableMetadata(object):
def as_cql_query(self, formatted=False): def as_cql_query(self, formatted=False):
""" """
Returns a CQL query that can be used to recreate this table (index Returns a CQL query that can be used to recreate this table (index
creations are not included). If `formatted` is set to ``True``, creations are not included). If `formatted` is set to :const:`True`,
extra whitespace will be added to make the query human readable. extra whitespace will be added to make the query human readable.
""" """
ret = "CREATE TABLE %s.%s (%s" % (self.keyspace.name, self.name, "\n" if formatted else "") ret = "CREATE TABLE %s.%s (%s" % (self.keyspace.name, self.name, "\n" if formatted else "")
@@ -553,7 +553,7 @@ class ColumnMetadata(object):
index = None index = None
""" """
If an index exists on this column, this is an instance of If an index exists on this column, this is an instance of
:class:`.IndexMetadata`, otherwise ``None``. :class:`.IndexMetadata`, otherwise :const:`None`.
""" """
def __init__(self, table_metadata, column_name, data_type, index_metadata=None): def __init__(self, table_metadata, column_name, data_type, index_metadata=None):

View File

@@ -75,7 +75,7 @@ class LoadBalancingPolicy(object):
order. A generator may work well for custom implementations order. A generator may work well for custom implementations
of this method. of this method.
Note that the `query` argument may be ``None`` when preparing Note that the `query` argument may be :const:`None` when preparing
statements. statements.
""" """
raise NotImplementedError() raise NotImplementedError()
@@ -175,8 +175,8 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
`used_hosts_per_remote_dc` controls how many nodes in `used_hosts_per_remote_dc` controls how many nodes in
each remote datacenter will have connections opened each remote datacenter will have connections opened
against them. In other words, `used_hosts_per_remote_dc` hosts against them. In other words, `used_hosts_per_remote_dc` hosts
will be considered :data:`.HostDistance.REMOTE` and the will be considered :attr:`~.HostDistance.REMOTE` and the
rest will be considered :data:`.HostDistance.IGNORED`. rest will be considered :attr:`~.HostDistance.IGNORED`.
By default, all remote hosts are ignored. By default, all remote hosts are ignored.
""" """
self.local_dc = local_dc self.local_dc = local_dc
@@ -241,6 +241,70 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
self._dc_live_hosts.setdefault(host.datacenter, set()).discard(host) self._dc_live_hosts.setdefault(host.datacenter, set()).discard(host)
class TokenAwarePolicy(LoadBalancingPolicy):
"""
A :class:`.LoadBalancingPolicy` wrapper that adds token awareness to
a child policy.
This alters the child policy's behavior so that it first attempts to
send queries to :attr:`~.HostDistance.LOCAL` replicas (as determined
by the child policy) based on the :class:`.Query`'s
:attr:`~.Query.routing_key`. Once those hosts are exhausted, the
remaining hosts in the child policy's query plan will be used.
If no :attr:`~.Query.routing_key` is set on the query, the child
policy's query plan will be used as is.
"""
_child_policy = None
_cluster_metadata = None
def __init__(self, child_policy):
self.child_policy = child_policy
def populate(self, cluster, hosts):
self._cluster_metadata = cluster.metadata
self.child_policy.populate(cluster, hosts)
def distance(self, *args, **kwargs):
return self.child_policy.distance(*args, **kwargs)
def make_query_plan(self, query=None):
child = self.child_policy
if query is None:
for host in child.make_query_plan(query):
yield host
else:
routing_key = query.routing_key
if routing_key is None:
for host in child.make_query_plan(query):
yield host
else:
replicas = self.metadata.get_replicas(routing_key)
for replica in replicas:
if replica.monitor.is_up and \
child.distance(replica) == HostDistance.LOCAL:
yield replica
for host in child.make_query_plan(query):
# skip if we've already listed this host
if host not in replicas or \
child.distance(replica) == HostDistance.REMOTE:
yield host
def on_up(self, *args, **kwargs):
return self.child_policy.on_up(*args, **kwargs)
def on_down(self, *args, **kwargs):
return self.child_policy.on_down(*args, **kwargs)
def on_add(self, *args, **kwargs):
return self.child_policy.on_add(*args, **kwargs)
def on_remove(self, *args, **kwargs):
return self.child_policy.on_remove(*args, **kwargs)
class ConvictionPolicy(object): class ConvictionPolicy(object):
""" """
A policy which decides when hosts should be considered down A policy which decides when hosts should be considered down
@@ -257,8 +321,8 @@ class ConvictionPolicy(object):
def add_failure(self, connection_exc): def add_failure(self, connection_exc):
""" """
Implementations should return ``True`` if the host should be Implementations should return :const:`True` if the host should be
convicted, ``False`` otherwise. convicted, :const:`False` otherwise.
""" """
raise NotImplementedError() raise NotImplementedError()
@@ -314,7 +378,7 @@ class ConstantReconnectionPolicy(ReconnectionPolicy):
each attempt. each attempt.
`max_attempts` should be a total number of attempts to be made before `max_attempts` should be a total number of attempts to be made before
giving up, or ``None`` to continue reconnection attempts forever. giving up, or :const:`None` to continue reconnection attempts forever.
The default is 64. The default is 64.
""" """
if delay < 0: if delay < 0:
@@ -431,7 +495,7 @@ class RetryPolicy(object):
perspective (i.e. a replica did not respond to the coordinator in time). perspective (i.e. a replica did not respond to the coordinator in time).
It should return a tuple with two items: one of the class enums (such It should return a tuple with two items: one of the class enums (such
as :attr:`.RETRY`) and a :class:`.ConsistencyLevel` to retry the as :attr:`.RETRY`) and a :class:`.ConsistencyLevel` to retry the
operation at or ``None`` to keep the same consistency level. operation at or :const:`None` to keep the same consistency level.
`query` is the :class:`.Query` that timed out. `query` is the :class:`.Query` that timed out.
@@ -541,7 +605,7 @@ class DowngradingConsistencyRetryPolicy(RetryPolicy):
level than the one initially requested. By doing so, it may break level than the one initially requested. By doing so, it may break
consistency guarantees. In other words, if you use this retry policy, consistency guarantees. In other words, if you use this retry policy,
there is cases (documented below) where a read at :attr:`~.QUORUM` there is cases (documented below) where a read at :attr:`~.QUORUM`
*may not* see a preceding write at :attr`~.QUORUM`. Do not use this *may not* see a preceding write at :attr:`~.QUORUM`. Do not use this
policy unless you have understood the cases where this can happen and policy unless you have understood the cases where this can happen and
are ok with that. It is also recommended to subclass this class so are ok with that. It is also recommended to subclass this class so
that queries that required a consistency level downgrade can be that queries that required a consistency level downgrade can be

View File

@@ -158,9 +158,10 @@ class _ReconnectionHandler(object):
number of seconds (as a float) that the handler will wait before number of seconds (as a float) that the handler will wait before
attempting to connect again. attempting to connect again.
Subclasses should return ``False`` if no more attempts to connection Subclasses should return :const:`False` if no more attempts to
should be made, ``True`` otherwise. The default behavior is to connection should be made, :const:`True` otherwise. The default
always retry unless the error is an :exc:`.AuthenticationFailed`. behavior is to always retry unless the error is an
:exc:`.AuthenticationFailed` instance.
""" """
if isinstance(exc, AuthenticationFailed): if isinstance(exc, AuthenticationFailed):
return False return False

View File

@@ -26,7 +26,7 @@ class Query(object):
tracing_enabled = False tracing_enabled = False
""" """
A boolean flag that may be set to ``True`` to enable tracing on this A boolean flag that may be set to :const:`True` to enable tracing on this
query only. query only.
**Note**: query tracing is not yet supported by this driver **Note**: query tracing is not yet supported by this driver
@@ -45,18 +45,30 @@ class Query(object):
self.consistency_level = consistency_level self.consistency_level = consistency_level
self._routing_key = routing_key self._routing_key = routing_key
@property def _get_routing_key(self):
def routing_key(self):
return self._routing_key return self._routing_key
@routing_key.setter def _set_routing_key(self, key_components):
def set_routing_key(self, *key_components):
if len(key_components) == 1: if len(key_components) == 1:
self._routing_key = key_components[0] self._routing_key = key_components[0]
else: else:
self._routing_key = "".join(struct.pack("HsB", len(component), component, 0) self._routing_key = "".join(struct.pack("HsB", len(component), component, 0)
for component in key_components) for component in key_components)
def _del_routing_key(self):
self._routing_key = None
routing_key = property(
_get_routing_key,
_set_routing_key,
_del_routing_key,
"""
The :attr:`~.TableMetadata.partition_key` portion of the primary key,
which can be used to determine which nodes are replicas for the query.
When setting this attribute, a list or tuple *must* be used.
""")
class SimpleStatement(Query): class SimpleStatement(Query):
""" """
A simple, un-prepared query. All attributes of :class:`Query` apply A simple, un-prepared query. All attributes of :class:`Query` apply

View File

@@ -45,8 +45,8 @@ class ClusterTests(unittest.TestCase):
def foo(*args, **kwargs): def foo(*args, **kwargs):
return Mock() return Mock()
for kw in ('auth_provider', 'load_balancing_policy_factory', for kw in ('auth_provider', 'retry_policy_factory',
'retry_policy_factory', 'conviction_policy_factory'): 'conviction_policy_factory'):
kwargs = {kw: 123} kwargs = {kw: 123}
self.assertRaises(ValueError, Cluster, **kwargs) self.assertRaises(ValueError, Cluster, **kwargs)
@@ -55,7 +55,8 @@ class ClusterTests(unittest.TestCase):
self.assertEquals(getattr(c, kw), foo) self.assertEquals(getattr(c, kw), foo)
for kw in ('contact_points', 'port', 'compression', 'metrics_enabled', for kw in ('contact_points', 'port', 'compression', 'metrics_enabled',
'reconnection_policy', 'sockopts', 'max_schema_agreement_wait'): 'load_balancing_policy', 'reconnection_policy', 'sockopts',
'max_schema_agreement_wait'):
kwargs = {kw: (1, 2, 3)} kwargs = {kw: (1, 2, 3)}
c = Cluster(**kwargs) c = Cluster(**kwargs)
self.assertEquals(getattr(c, kw), (1, 2, 3)) self.assertEquals(getattr(c, kw), (1, 2, 3))

View File

@@ -39,7 +39,7 @@ class MockMetadata(object):
class MockCluster(object): class MockCluster(object):
max_schema_agreement_wait = Cluster.max_schema_agreement_wait max_schema_agreement_wait = Cluster.max_schema_agreement_wait
load_balancing_policy_factory = RoundRobinPolicy load_balancing_policy = RoundRobinPolicy()
reconnection_policy = ConstantReconnectionPolicy(2) reconnection_policy = ConstantReconnectionPolicy(2)
def __init__(self): def __init__(self):