Merge remote-tracking branch 'origin/3.3'

This commit is contained in:
Adam Holmberg
2016-04-12 16:22:09 -05:00
14 changed files with 294 additions and 163 deletions

View File

@@ -194,7 +194,11 @@ class UserAggregateDescriptor(SignatureDescriptor):
"""
class Unavailable(Exception):
class DriverException(Exception):
pass
class Unavailable(DriverException):
"""
There were not enough live replicas to satisfy the requested consistency
level, so the coordinator node immediately failed the request without
@@ -220,7 +224,7 @@ class Unavailable(Exception):
'alive_replicas': alive_replicas}))
class Timeout(Exception):
class Timeout(DriverException):
"""
Replicas failed to respond to the coordinator node before timing out.
"""
@@ -289,7 +293,7 @@ class WriteTimeout(Timeout):
self.write_type = write_type
class CoordinationFailure(Exception):
class CoordinationFailure(DriverException):
"""
Replicas sent a failure to the coordinator.
"""
@@ -359,7 +363,7 @@ class WriteFailure(CoordinationFailure):
self.write_type = write_type
class FunctionFailure(Exception):
class FunctionFailure(DriverException):
"""
User Defined Function failed during execution
"""
@@ -386,7 +390,7 @@ class FunctionFailure(Exception):
Exception.__init__(self, summary_message)
class AlreadyExists(Exception):
class AlreadyExists(DriverException):
"""
An attempt was made to create a keyspace or table that already exists.
"""
@@ -414,7 +418,7 @@ class AlreadyExists(Exception):
self.table = table
class InvalidRequest(Exception):
class InvalidRequest(DriverException):
"""
A query was made that was invalid for some reason, such as trying to set
the keyspace for a connection to a nonexistent keyspace.
@@ -422,21 +426,21 @@ class InvalidRequest(Exception):
pass
class Unauthorized(Exception):
class Unauthorized(DriverException):
"""
The current user is not authorized to perfom the requested operation.
"""
pass
class AuthenticationFailed(Exception):
class AuthenticationFailed(DriverException):
"""
Failed to authenticate.
"""
pass
class OperationTimedOut(Exception):
class OperationTimedOut(DriverException):
"""
The operation took longer than the specified (client-side) timeout
to complete. This is not an error generated by Cassandra, only
@@ -460,7 +464,7 @@ class OperationTimedOut(Exception):
Exception.__init__(self, message)
class UnsupportedOperation(Exception):
class UnsupportedOperation(DriverException):
"""
An attempt was made to use a feature that is not supported by the
selected protocol version. See :attr:`Cluster.protocol_version`

View File

@@ -45,7 +45,7 @@ from itertools import groupby, count
from cassandra import (ConsistencyLevel, AuthenticationFailed,
OperationTimedOut, UnsupportedOperation,
SchemaTargetType)
SchemaTargetType, DriverException)
from cassandra.connection import (ConnectionException, ConnectionShutdown,
ConnectionHeartbeat, ProtocolVersionUnsupported)
from cassandra.cqltypes import UserType
@@ -65,7 +65,7 @@ from cassandra.protocol import (QueryMessage, ResultMessage,
from cassandra.metadata import Metadata, protect_name, murmur3
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance,
RetryPolicy)
RetryPolicy, IdentityTranslator)
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
HostConnectionPool, HostConnection,
NoConnectionsAvailable)
@@ -347,6 +347,12 @@ class Cluster(object):
:class:`.policies.SimpleConvictionPolicy`.
"""
address_translator = IdentityTranslator()
"""
:class:`.policies.AddressTranslator` instance to be used in translating server node addresses
to driver connection addresses.
"""
connect_to_remote_hosts = True
"""
If left as :const:`True`, hosts that are considered :attr:`~.HostDistance.REMOTE`
@@ -481,6 +487,37 @@ class Cluster(object):
establishment, options passing, and authentication.
"""
@property
def schema_metadata_enabled(self):
"""
Flag indicating whether internal schema metadata is updated.
When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This
can be used to speed initial connection, and reduce load on client and server during operation. Turning this off
gives away token aware request routing, and programmatic inspection of the metadata model.
"""
return self.control_connection._schema_meta_enabled
@schema_metadata_enabled.setter
def schema_metadata_enabled(self, enabled):
self.control_connection._schema_meta_enabled = bool(enabled)
@property
def token_metadata_enabled(self):
"""
Flag indicating whether internal token metadata is updated.
When disabled, the driver does not query node token information on connect, or on topology change events. This
can be used to speed initial connection, and reduce load on client and server during operation. It is most useful
in large clusters using vnodes, where the token map can be expensive to compute. Turning this off
gives away token aware request routing, and programmatic inspection of the token ring.
"""
return self.control_connection._token_meta_enabled
@token_metadata_enabled.setter
def token_metadata_enabled(self, enabled):
self.control_connection._token_meta_enabled = bool(enabled)
sessions = None
control_connection = None
scheduler = None
@@ -520,7 +557,10 @@ class Cluster(object):
idle_heartbeat_interval=30,
schema_event_refresh_window=2,
topology_event_refresh_window=10,
connect_timeout=5):
connect_timeout=5,
schema_metadata_enabled=True,
token_metadata_enabled=True,
address_translator=None):
"""
Any of the mutable Cluster attributes may be set as keyword arguments
to the constructor.
@@ -529,9 +569,15 @@ class Cluster(object):
if isinstance(contact_points, six.string_types):
raise TypeError("contact_points should not be a string, it should be a sequence (e.g. list) of strings")
if None in contact_points:
raise ValueError("contact_points should not contain None (it can resolve to localhost)")
self.contact_points = contact_points
self.port = port
self.contact_points_resolved = [endpoint[4][0] for a in self.contact_points
for endpoint in socket.getaddrinfo(a, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)]
self.compression = compression
self.protocol_version = protocol_version
self.auth_provider = auth_provider
@@ -539,7 +585,6 @@ class Cluster(object):
if load_balancing_policy is not None:
if isinstance(load_balancing_policy, type):
raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class")
self.load_balancing_policy = load_balancing_policy
else:
self.load_balancing_policy = default_lbp_factory()
@@ -547,13 +592,11 @@ class Cluster(object):
if reconnection_policy is not None:
if isinstance(reconnection_policy, type):
raise TypeError("reconnection_policy should not be a class, it should be an instance of that class")
self.reconnection_policy = reconnection_policy
if default_retry_policy is not None:
if isinstance(default_retry_policy, type):
raise TypeError("default_retry_policy should not be a class, it should be an instance of that class")
self.default_retry_policy = default_retry_policy
if conviction_policy_factory is not None:
@@ -561,6 +604,11 @@ class Cluster(object):
raise ValueError("conviction_policy_factory must be callable")
self.conviction_policy_factory = conviction_policy_factory
if address_translator is not None:
if isinstance(address_translator, type):
raise TypeError("address_translator should not be a class, it should be an instance of that class")
self.address_translator = address_translator
if connection_class is not None:
self.connection_class = connection_class
@@ -619,7 +667,9 @@ class Cluster(object):
self.control_connection = ControlConnection(
self, self.control_connection_timeout,
self.schema_event_refresh_window, self.topology_event_refresh_window)
self.schema_event_refresh_window, self.topology_event_refresh_window,
schema_metadata_enabled, token_metadata_enabled)
def register_user_type(self, keyspace, user_type, klass):
"""
@@ -813,7 +863,7 @@ class Cluster(object):
log.warning("Downgrading core protocol version from %d to %d for %s", self.protocol_version, new_version, host_addr)
self.protocol_version = new_version
else:
raise Exception("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION))
raise DriverException("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION))
def connect(self, keyspace=None):
"""
@@ -823,14 +873,14 @@ class Cluster(object):
"""
with self._lock:
if self.is_shutdown:
raise Exception("Cluster is already shut down")
raise DriverException("Cluster is already shut down")
if not self._is_setup:
log.debug("Connecting to cluster, contact points: %s; protocol version: %s",
self.contact_points, self.protocol_version)
self.connection_class.initialize_reactor()
atexit.register(partial(_shutdown_cluster, self))
for address in self.contact_points:
for address in self.contact_points_resolved:
host, new = self.add_host(address, signal=False)
if new:
host.set_up()
@@ -1244,8 +1294,8 @@ class Cluster(object):
An Exception is raised if schema refresh fails for any reason.
"""
if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait):
raise Exception("Schema metadata was not refreshed. See log for details.")
if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait, force=True):
raise DriverException("Schema metadata was not refreshed. See log for details.")
def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None):
"""
@@ -1255,8 +1305,8 @@ class Cluster(object):
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
"""
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.KEYSPACE, keyspace=keyspace,
schema_agreement_wait=max_schema_agreement_wait):
raise Exception("Keyspace metadata was not refreshed. See log for details.")
schema_agreement_wait=max_schema_agreement_wait, force=True):
raise DriverException("Keyspace metadata was not refreshed. See log for details.")
def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None):
"""
@@ -1265,8 +1315,9 @@ class Cluster(object):
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
"""
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table, schema_agreement_wait=max_schema_agreement_wait):
raise Exception("Table metadata was not refreshed. See log for details.")
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table,
schema_agreement_wait=max_schema_agreement_wait, force=True):
raise DriverException("Table metadata was not refreshed. See log for details.")
def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreement_wait=None):
"""
@@ -1274,8 +1325,9 @@ class Cluster(object):
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
"""
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view, schema_agreement_wait=max_schema_agreement_wait):
raise Exception("View metadata was not refreshed. See log for details.")
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view,
schema_agreement_wait=max_schema_agreement_wait, force=True):
raise DriverException("View metadata was not refreshed. See log for details.")
def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_wait=None):
"""
@@ -1283,8 +1335,9 @@ class Cluster(object):
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
"""
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type, schema_agreement_wait=max_schema_agreement_wait):
raise Exception("User Type metadata was not refreshed. See log for details.")
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type,
schema_agreement_wait=max_schema_agreement_wait, force=True):
raise DriverException("User Type metadata was not refreshed. See log for details.")
def refresh_user_function_metadata(self, keyspace, function, max_schema_agreement_wait=None):
"""
@@ -1294,8 +1347,9 @@ class Cluster(object):
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
"""
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function, schema_agreement_wait=max_schema_agreement_wait):
raise Exception("User Function metadata was not refreshed. See log for details.")
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function,
schema_agreement_wait=max_schema_agreement_wait, force=True):
raise DriverException("User Function metadata was not refreshed. See log for details.")
def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreement_wait=None):
"""
@@ -1305,8 +1359,9 @@ class Cluster(object):
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
"""
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate, schema_agreement_wait=max_schema_agreement_wait):
raise Exception("User Aggregate metadata was not refreshed. See log for details.")
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate,
schema_agreement_wait=max_schema_agreement_wait, force=True):
raise DriverException("User Aggregate metadata was not refreshed. See log for details.")
def refresh_nodes(self):
"""
@@ -1315,10 +1370,12 @@ class Cluster(object):
An Exception is raised if node refresh fails for any reason.
"""
if not self.control_connection.refresh_node_list_and_token_map():
raise Exception("Node list was not refreshed. See log for details.")
raise DriverException("Node list was not refreshed. See log for details.")
def set_meta_refresh_enabled(self, enabled):
"""
*Deprecated:* set :attr:`~.Cluster.schema_metadata_enabled` :attr:`~.Cluster.token_metadata_enabled` instead
Sets a flag to enable (True) or disable (False) all metadata refresh queries.
This applies to both schema and node topology.
@@ -1327,7 +1384,8 @@ class Cluster(object):
Meta refresh must be enabled for the driver to become aware of any cluster
topology changes or schema updates.
"""
self.control_connection.set_meta_refresh_enabled(bool(enabled))
self.schema_metadata_enabled = enabled
self.token_metadata_enabled = enabled
def _prepare_all_queries(self, host):
if not self._prepared_statements:
@@ -2009,8 +2067,11 @@ class ControlConnection(object):
Internal
"""
_SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers"
_SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
_SELECT_PEERS = "SELECT * FROM system.peers"
_SELECT_PEERS_NO_TOKENS = "SELECT peer, data_center, rack, rpc_address, schema_version FROM system.peers"
_SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'"
_SELECT_LOCAL_NO_TOKENS = "SELECT cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
_SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers"
_SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
@@ -2022,14 +2083,17 @@ class ControlConnection(object):
_schema_event_refresh_window = None
_topology_event_refresh_window = None
_meta_refresh_enabled = True
_schema_meta_enabled = True
_token_meta_enabled = True
# for testing purposes
_time = time
def __init__(self, cluster, timeout,
schema_event_refresh_window,
topology_event_refresh_window):
topology_event_refresh_window,
schema_meta_enabled=True,
token_meta_enabled=True):
# use a weak reference to allow the Cluster instance to be GC'ed (and
# shutdown) since implementing __del__ disables the cycle detector
self._cluster = weakref.proxy(cluster)
@@ -2038,6 +2102,8 @@ class ControlConnection(object):
self._schema_event_refresh_window = schema_event_refresh_window
self._topology_event_refresh_window = topology_event_refresh_window
self._schema_meta_enabled = schema_meta_enabled
self._token_meta_enabled = token_meta_enabled
self._lock = RLock()
self._schema_agreement_lock = Lock()
@@ -2119,8 +2185,10 @@ class ControlConnection(object):
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
}, register_timeout=self._timeout)
peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=ConsistencyLevel.ONE)
local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=ConsistencyLevel.ONE)
sel_peers = self._SELECT_PEERS if self._token_meta_enabled else self._SELECT_PEERS_NO_TOKENS
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE)
shared_results = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)
@@ -2200,14 +2268,10 @@ class ControlConnection(object):
self._connection.close()
del self._connection
def refresh_schema(self, **kwargs):
if not self._meta_refresh_enabled:
log.debug("[control connection] Skipping schema refresh because meta refresh is disabled")
return False
def refresh_schema(self, force=False, **kwargs):
try:
if self._connection:
return self._refresh_schema(self._connection, **kwargs)
return self._refresh_schema(self._connection, force=force, **kwargs)
except ReferenceError:
pass # our weak reference to the Cluster is no good
except Exception:
@@ -2215,13 +2279,18 @@ class ControlConnection(object):
self._signal_error()
return False
def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, **kwargs):
def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, force=False, **kwargs):
if self._cluster.is_shutdown:
return False
agreed = self.wait_for_schema_agreement(connection,
preloaded_results=preloaded_results,
wait_time=schema_agreement_wait)
if not self._schema_meta_enabled and not force:
log.debug("[control connection] Skipping schema refresh because schema metadata is disabled")
return False
if not agreed:
log.debug("Skipping schema refresh due to lack of schema agreement")
return False
@@ -2231,10 +2300,6 @@ class ControlConnection(object):
return True
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
if not self._meta_refresh_enabled:
log.debug("[control connection] Skipping node list refresh because meta refresh is disabled")
return False
try:
if self._connection:
self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild)
@@ -2254,10 +2319,17 @@ class ControlConnection(object):
peers_result = preloaded_results[0]
local_result = preloaded_results[1]
else:
log.debug("[control connection] Refreshing node list and token map")
cl = ConsistencyLevel.ONE
peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=cl)
local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=cl)
if not self._token_meta_enabled:
log.debug("[control connection] Refreshing node list without token map")
sel_peers = self._SELECT_PEERS_NO_TOKENS
sel_local = self._SELECT_LOCAL_NO_TOKENS
else:
log.debug("[control connection] Refreshing node list and token map")
sel_peers = self._SELECT_PEERS
sel_local = self._SELECT_LOCAL
peers_query = QueryMessage(query=sel_peers, consistency_level=cl)
local_query = QueryMessage(query=sel_local, consistency_level=cl)
peers_result, local_result = connection.wait_for_responses(
peers_query, local_query, timeout=self._timeout)
@@ -2277,6 +2349,8 @@ class ControlConnection(object):
datacenter = local_row.get("data_center")
rack = local_row.get("rack")
self._update_location_info(host, datacenter, rack)
host.listen_address = local_row.get("listen_address")
host.broadcast_address = local_row.get("broadcast_address")
partitioner = local_row.get("partitioner")
tokens = local_row.get("tokens")
@@ -2291,13 +2365,10 @@ class ControlConnection(object):
should_rebuild_token_map = force_token_rebuild or self._cluster.metadata.partitioner is None
found_hosts = set()
for row in peers_result:
addr = row.get("rpc_address")
addr = self._rpc_from_peer_row(row)
if not addr or addr in ["0.0.0.0", "::"]:
addr = row.get("peer")
tokens = row.get("tokens")
if not tokens:
tokens = row.get("tokens", None)
if 'tokens' in row and not tokens: # it was selected, but empty
log.warning("Excluding host (%s) with no tokens in system.peers table of %s." % (addr, connection.host))
continue
@@ -2313,6 +2384,8 @@ class ControlConnection(object):
else:
should_rebuild_token_map |= self._update_location_info(host, datacenter, rack)
host.broadcast_address = row.get("peer")
if partitioner and tokens:
token_map[host] = tokens
@@ -2320,7 +2393,7 @@ class ControlConnection(object):
if old_host.address != connection.host and old_host.address not in found_hosts:
should_rebuild_token_map = True
if old_host.address not in self._cluster.contact_points:
log.debug("[control connection] Found host that has been removed: %r", old_host)
log.debug("[control connection] Removing host not found in peers metadata: %r", old_host)
self._cluster.remove_host(old_host)
log.debug("[control connection] Finished fetching ring info")
@@ -2356,7 +2429,7 @@ class ControlConnection(object):
def _handle_topology_change(self, event):
change_type = event["change_type"]
addr, port = event["address"]
addr = self._translate_address(event["address"][0])
if change_type == "NEW_NODE" or change_type == "MOVED_NODE":
if self._topology_event_refresh_window >= 0:
delay = self._delay_for_event_type('topology_change', self._topology_event_refresh_window)
@@ -2367,7 +2440,7 @@ class ControlConnection(object):
def _handle_status_change(self, event):
change_type = event["change_type"]
addr, port = event["address"]
addr = self._translate_address(event["address"][0])
host = self._cluster.metadata.get_host(addr)
if change_type == "UP":
delay = 1 + self._delay_for_event_type('status_change', 0.5) # randomness to avoid thundering herd problem on events
@@ -2385,6 +2458,9 @@ class ControlConnection(object):
# this will be run by the scheduler
self._cluster.on_down(host, is_host_addition=False)
def _translate_address(self, addr):
return self._cluster.address_translator.translate(addr)
def _handle_schema_change(self, event):
if self._schema_event_refresh_window < 0:
return
@@ -2466,11 +2542,7 @@ class ControlConnection(object):
schema_ver = row.get('schema_version')
if not schema_ver:
continue
addr = row.get("rpc_address")
if not addr or addr in ["0.0.0.0", "::"]:
addr = row.get("peer")
addr = self._rpc_from_peer_row(row)
peer = self._cluster.metadata.get_host(addr)
if peer and peer.is_up:
versions[schema_ver].add(addr)
@@ -2481,6 +2553,12 @@ class ControlConnection(object):
return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
def _rpc_from_peer_row(self, row):
addr = row.get("rpc_address")
if not addr or addr in ["0.0.0.0", "::"]:
addr = row.get("peer")
return self._translate_address(addr)
def _signal_error(self):
with self._lock:
if self._is_shutdown:
@@ -2529,9 +2607,6 @@ class ControlConnection(object):
if connection is self._connection and (connection.is_defunct or connection.is_closed):
self.reconnect()
def set_meta_refresh_enabled(self, enabled):
self._meta_refresh_enabled = enabled
def _stop_scheduler(scheduler, thread):
try:
@@ -2626,13 +2701,9 @@ class _Scheduler(object):
def refresh_schema_and_set_result(control_conn, response_future, **kwargs):
try:
if control_conn._meta_refresh_enabled:
log.debug("Refreshing schema in response to schema change. "
"%s", kwargs)
response_future.is_schema_agreed = control_conn._refresh_schema(response_future._connection, **kwargs)
else:
log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; "
"%s", kwargs)
log.debug("Refreshing schema in response to schema change. "
"%s", kwargs)
response_future.is_schema_agreed = control_conn._refresh_schema(response_future._connection, **kwargs)
except Exception:
log.exception("Exception refreshing schema in response to schema change:")
response_future.session.submit(control_conn.refresh_schema, **kwargs)
@@ -2717,7 +2788,16 @@ class ResponseFuture(object):
self._timer.cancel()
def _on_timeout(self):
self._set_final_exception(OperationTimedOut(self._errors, self._current_host))
errors = self._errors
if not errors:
if self.is_schema_agreed:
errors = {self._current_host.address: "Client request timeout. See Session.execute[_async](timeout)"}
else:
connection = getattr(self.session.cluster.control_connection, '_connection')
host = connection.host if connection else 'unknown'
errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."}
self._set_final_exception(OperationTimedOut(errors, self._current_host))
def _make_query_plan(self):
# convert the list/generator/etc to an iterator so that subsequent
@@ -2809,7 +2889,7 @@ class ResponseFuture(object):
"""
# TODO: When timers are introduced, just make this wait
if not self._event.is_set():
raise Exception("warnings cannot be retrieved before ResponseFuture is finalized")
raise DriverException("warnings cannot be retrieved before ResponseFuture is finalized")
return self._warnings
@property
@@ -2827,7 +2907,7 @@ class ResponseFuture(object):
"""
# TODO: When timers are introduced, just make this wait
if not self._event.is_set():
raise Exception("custom_payload cannot be retrieved before ResponseFuture is finalized")
raise DriverException("custom_payload cannot be retrieved before ResponseFuture is finalized")
return self._custom_payload
def start_fetching_next_page(self):
@@ -2982,15 +3062,17 @@ class ResponseFuture(object):
return
retry_type, consistency = retry
if retry_type is RetryPolicy.RETRY:
if retry_type in (RetryPolicy.RETRY, RetryPolicy.RETRY_NEXT_HOST):
self._query_retries += 1
self._retry(reuse_connection=True, consistency_level=consistency)
reuse = retry_type == RetryPolicy.RETRY
self._retry(reuse_connection=reuse, consistency_level=consistency)
elif retry_type is RetryPolicy.RETHROW:
self._set_final_exception(response.to_exception())
else: # IGNORE
if self._metrics is not None:
self._metrics.on_ignore()
self._set_final_result(None)
self._errors[self._current_host] = response.to_exception()
elif isinstance(response, ConnectionException):
if self._metrics is not None:
self._metrics.on_connection_error()

View File

@@ -125,6 +125,7 @@ class _Frame(object):
NONBLOCKING = (errno.EAGAIN, errno.EWOULDBLOCK)
class ConnectionException(Exception):
"""
An unrecoverable error was hit when attempting to use a connection,
@@ -319,12 +320,14 @@ class Connection(object):
def _connect_socket(self):
sockerr = None
addresses = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
if not addresses:
raise ConnectionException("getaddrinfo returned empty list for %s" % (self.host,))
for (af, socktype, proto, canonname, sockaddr) in addresses:
try:
self._socket = self._socket_impl.socket(af, socktype, proto)
if self.ssl_options:
if not self._ssl_impl:
raise Exception("This version of Python was not compiled with SSL support")
raise RuntimeError("This version of Python was not compiled with SSL support")
self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options)
self._socket.settimeout(self.connect_timeout)
self._socket.connect(sockaddr)

View File

@@ -14,17 +14,16 @@
import gevent
import gevent.event
from gevent.queue import Queue
from gevent import select, socket
from gevent import socket
import gevent.ssl
from functools import partial
import logging
import os
import time
from six.moves import range
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL
from errno import EINVAL
from cassandra.connection import Connection, ConnectionShutdown, Timer, TimerManager
@@ -34,9 +33,8 @@ log = logging.getLogger(__name__)
def is_timeout(err):
return (
err in (EINPROGRESS, EALREADY, EWOULDBLOCK) or
(err == EINVAL and os.name in ('nt', 'ce')) or
isinstance(err, socket.timeout)
isinstance(err, socket.timeout)
)
@@ -118,44 +116,23 @@ class GeventConnection(Connection):
self.close()
def handle_write(self):
run_select = partial(select.select, (), (self._socket,), ())
while True:
try:
next_msg = self._write_queue.get()
run_select()
except Exception as exc:
if not self.is_closed:
log.debug("Exception during write select() for %s: %s", self, exc)
self.defunct(exc)
return
try:
self._socket.sendall(next_msg)
except socket.error as err:
log.debug("Exception during socket sendall for %s: %s", self, err)
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return # Leave the write loop
def handle_read(self):
run_select = partial(select.select, (self._socket,), (), ())
while True:
try:
run_select()
except Exception as exc:
if not self.is_closed:
log.debug("Exception during read select() for %s: %s", self, exc)
self.defunct(exc)
return
def handle_read(self):
while True:
try:
while True:
buf = self._socket.recv(self.in_buffer_size)
self._iobuf.write(buf)
if len(buf) < self.in_buffer_size:
break
buf = self._socket.recv(self.in_buffer_size)
self._iobuf.write(buf)
except socket.error as err:
if not is_timeout(err):
log.debug("Exception during socket recv for %s: %s", self, err)
log.debug("Exception in read for %s: %s", self, err)
self.defunct(err)
return # leave the read loop

View File

@@ -1402,8 +1402,10 @@ class TokenMap(object):
with self._rebuild_lock:
current = self.tokens_to_hosts_by_ks.get(keyspace, None)
if (build_if_absent and current is None) or (not build_if_absent and current is not None):
replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
self.tokens_to_hosts_by_ks[keyspace] = replica_map
ks_meta = self._metadata.keyspaces.get(keyspace)
if ks_meta:
replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
self.tokens_to_hosts_by_ks[keyspace] = replica_map
def replica_map_for_keyspace(self, ks_metadata):
strategy = ks_metadata.replication_strategy

View File

@@ -16,12 +16,9 @@ from itertools import islice, cycle, groupby, repeat
import logging
from random import randint
from threading import Lock
import six
from cassandra import ConsistencyLevel
from six.moves import range
log = logging.getLogger(__name__)
@@ -235,7 +232,7 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
self._dc_live_hosts[dc] = tuple(set(dc_hosts))
if not self.local_dc:
self._contact_points = cluster.contact_points
self._contact_points = cluster.contact_points_resolved
self._position = randint(0, len(hosts) - 1) if hosts else 0
@@ -337,7 +334,7 @@ class TokenAwarePolicy(LoadBalancingPolicy):
def check_supported(self):
if not self._cluster_metadata.can_support_partitioner():
raise Exception(
raise RuntimeError(
'%s cannot be used with the cluster partitioner (%s) because '
'the relevant C extension for this driver was not compiled. '
'See the installation instructions for details on building '
@@ -554,8 +551,8 @@ class ExponentialReconnectionPolicy(ReconnectionPolicy):
self.max_attempts = max_attempts
def new_schedule(self):
i=0
while self.max_attempts == None or i < self.max_attempts:
i = 0
while self.max_attempts is None or i < self.max_attempts:
yield min(self.base_delay * (2 ** i), self.max_delay)
i += 1
@@ -650,6 +647,12 @@ class RetryPolicy(object):
should be ignored but no more retries should be attempted.
"""
RETRY_NEXT_HOST = 3
"""
This should be returned from the below methods if the operation
should be retried on another connection.
"""
def on_read_timeout(self, query, consistency, required_responses,
received_responses, data_retrieved, retry_num):
"""
@@ -677,11 +680,11 @@ class RetryPolicy(object):
a sufficient number of replicas responded (with data digests).
"""
if retry_num != 0:
return (self.RETHROW, None)
return self.RETHROW, None
elif received_responses >= required_responses and not data_retrieved:
return (self.RETRY, consistency)
return self.RETRY, consistency
else:
return (self.RETHROW, None)
return self.RETHROW, None
def on_write_timeout(self, query, consistency, write_type,
required_responses, received_responses, retry_num):
@@ -710,11 +713,11 @@ class RetryPolicy(object):
:attr:`~.WriteType.BATCH_LOG`.
"""
if retry_num != 0:
return (self.RETHROW, None)
return self.RETHROW, None
elif write_type == WriteType.BATCH_LOG:
return (self.RETRY, consistency)
return self.RETRY, consistency
else:
return (self.RETHROW, None)
return self.RETHROW, None
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
"""
@@ -739,7 +742,7 @@ class RetryPolicy(object):
By default, no retries will be attempted and the error will be re-raised.
"""
return (self.RETHROW, None)
return (self.RETRY_NEXT_HOST, consistency) if retry_num == 0 else (self.RETHROW, None)
class FallthroughRetryPolicy(RetryPolicy):
@@ -749,13 +752,13 @@ class FallthroughRetryPolicy(RetryPolicy):
"""
def on_read_timeout(self, *args, **kwargs):
return (self.RETHROW, None)
return self.RETHROW, None
def on_write_timeout(self, *args, **kwargs):
return (self.RETHROW, None)
return self.RETHROW, None
def on_unavailable(self, *args, **kwargs):
return (self.RETHROW, None)
return self.RETHROW, None
class DowngradingConsistencyRetryPolicy(RetryPolicy):
@@ -807,45 +810,73 @@ class DowngradingConsistencyRetryPolicy(RetryPolicy):
"""
def _pick_consistency(self, num_responses):
if num_responses >= 3:
return (self.RETRY, ConsistencyLevel.THREE)
return self.RETRY, ConsistencyLevel.THREE
elif num_responses >= 2:
return (self.RETRY, ConsistencyLevel.TWO)
return self.RETRY, ConsistencyLevel.TWO
elif num_responses >= 1:
return (self.RETRY, ConsistencyLevel.ONE)
return self.RETRY, ConsistencyLevel.ONE
else:
return (self.RETHROW, None)
return self.RETHROW, None
def on_read_timeout(self, query, consistency, required_responses,
received_responses, data_retrieved, retry_num):
if retry_num != 0:
return (self.RETHROW, None)
return self.RETHROW, None
elif received_responses < required_responses:
return self._pick_consistency(received_responses)
elif not data_retrieved:
return (self.RETRY, consistency)
return self.RETRY, consistency
else:
return (self.RETHROW, None)
return self.RETHROW, None
def on_write_timeout(self, query, consistency, write_type,
required_responses, received_responses, retry_num):
if retry_num != 0:
return (self.RETHROW, None)
return self.RETHROW, None
if write_type in (WriteType.SIMPLE, WriteType.BATCH, WriteType.COUNTER):
if received_responses > 0:
# persisted on at least one replica
return (self.IGNORE, None)
return self.IGNORE, None
else:
return (self.RETHROW, None)
return self.RETHROW, None
elif write_type == WriteType.UNLOGGED_BATCH:
return self._pick_consistency(received_responses)
elif write_type == WriteType.BATCH_LOG:
return (self.RETRY, consistency)
return self.RETRY, consistency
return (self.RETHROW, None)
return self.RETHROW, None
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
if retry_num != 0:
return (self.RETHROW, None)
return self.RETHROW, None
else:
return self._pick_consistency(alive_replicas)
class AddressTranslator(object):
"""
Interface for translating cluster-defined endpoints.
The driver discovers nodes using server metadata and topology change events. Normally,
the endpoint defined by the server is the right way to connect to a node. In some environments,
these addresses may not be reachable, or not preferred (public vs. private IPs in cloud environments,
suboptimal routing, etc). This interface allows for translating from server defined endpoints to
preferred addresses for driver connections.
*Note:* :attr:`~Cluster.contact_points` provided while creating the :class:`~.Cluster` instance are not
translated using this mechanism -- only addresses received from Cassandra nodes are.
"""
def translate(self, addr):
"""
Accepts the node ip address, and returns a translated address to be used connecting to this node.
"""
raise NotImplementedError
class IdentityTranslator(AddressTranslator):
"""
Returns the endpoint with no translation
"""
def translate(self, addr):
return addr

View File

@@ -48,7 +48,21 @@ class Host(object):
address = None
"""
The IP address or hostname of the node.
The IP address of the node. This is the RPC address the driver uses when connecting to the node
"""
broadcast_address = None
"""
broadcast address configured for the node, *if available* ('peer' in system.peers table).
This is not present in the ``system.local`` table for older versions of Cassandra. It is also not queried if
:attr:`~.Cluster.token_metadata_enabled` is ``False``.
"""
listen_address = None
"""
listen address configured for the node, *if available*. This is only available in the ``system.local`` table for newer
versions of Cassandra. It is also not queried if :attr:`~.Cluster.token_metadata_enabled` is ``False``.
Usually the same as ``broadcast_address`` unless configured differently in cassandra.yaml.
"""
conviction_policy = None

View File

@@ -22,7 +22,7 @@ import six
from six.moves import range
import io
from cassandra import type_codes
from cassandra import type_codes, DriverException
from cassandra import (Unavailable, WriteTimeout, ReadTimeout,
WriteFailure, ReadFailure, FunctionFailure,
AlreadyExists, InvalidRequest, Unauthorized,
@@ -589,7 +589,7 @@ class ResultMessage(_MessageType):
elif kind == RESULT_KIND_SCHEMA_CHANGE:
results = cls.recv_results_schema_change(f, protocol_version)
else:
raise Exception("Unknown RESULT kind: %d" % kind)
raise DriverException("Unknown RESULT kind: %d" % kind)
return cls(kind, results, paging_state)
@classmethod
@@ -971,7 +971,7 @@ class _ProtocolHandler(object):
"""
if flags & COMPRESSED_FLAG:
if decompressor is None:
raise Exception("No de-compressor available for compressed frame!")
raise RuntimeError("No de-compressor available for compressed frame!")
body = decompressor(body)
flags ^= COMPRESSED_FLAG

View File

@@ -1101,7 +1101,7 @@ else:
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows(*args):
raise Exception("IPv6 addresses cannot be handled on Windows. "
raise OSError("IPv6 addresses cannot be handled on Windows. "
"Missing ctypes.windll")
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows

View File

@@ -25,6 +25,8 @@
.. autoattribute:: conviction_policy_factory
.. autoattribute:: address_translator
.. autoattribute:: connection_class
.. autoattribute:: metrics_enabled
@@ -49,6 +51,12 @@
.. autoattribute:: connect_timeout
.. autoattribute:: schema_metadata_enabled
:annotation: = True
.. autoattribute:: token_metadata_enabled
:annotation: = True
.. automethod:: connect
.. automethod:: shutdown

View File

@@ -24,6 +24,15 @@ Load Balancing
.. autoclass:: TokenAwarePolicy
:members:
Translating Server Node Addresses
---------------------------------
.. autoclass:: AddressTranslator
:members:
.. autoclass:: IdentityTranslator
:members:
Marking Hosts Up or Down
------------------------

View File

@@ -145,13 +145,13 @@ class MetricsTests(unittest.TestCase):
query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL)
with self.assertRaises(Unavailable):
self.session.execute(query)
self.assertEqual(1, self.cluster.metrics.stats.unavailables)
self.assertEqual(2, self.cluster.metrics.stats.unavailables)
# Test write
query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL)
with self.assertRaises(Unavailable):
self.session.execute(query, timeout=None)
self.assertEqual(2, self.cluster.metrics.stats.unavailables)
self.assertEqual(4, self.cluster.metrics.stats.unavailables)
finally:
get_node(1).start(wait_other_notice=True, wait_for_binary_proto=True)
# Give some time for the cluster to come back up, for the next test

View File

@@ -25,7 +25,7 @@ from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS
from cassandra.cluster import ControlConnection, _Scheduler
from cassandra.pool import Host
from cassandra.policies import (SimpleConvictionPolicy, RoundRobinPolicy,
ConstantReconnectionPolicy)
ConstantReconnectionPolicy, IdentityTranslator)
PEER_IP = "foobar"
@@ -61,6 +61,7 @@ class MockCluster(object):
max_schema_agreement_wait = 5
load_balancing_policy = RoundRobinPolicy()
reconnection_policy = ConstantReconnectionPolicy(2)
address_translator = IdentityTranslator()
down_host = None
contact_points = []
is_shutdown = False

View File

@@ -483,7 +483,7 @@ class DCAwareRoundRobinPolicyTest(unittest.TestCase):
host_none = Host(1, SimpleConvictionPolicy)
# contact point is '1'
cluster = Mock(contact_points=[1])
cluster = Mock(contact_points_resolved=[1])
# contact DC first
policy = DCAwareRoundRobinPolicy()
@@ -916,14 +916,14 @@ class RetryPolicyTest(unittest.TestCase):
retry, consistency = policy.on_unavailable(
query=None, consistency=ONE,
required_replicas=1, alive_replicas=2, retry_num=0)
self.assertEqual(retry, RetryPolicy.RETHROW)
self.assertEqual(consistency, None)
self.assertEqual(retry, RetryPolicy.RETRY_NEXT_HOST)
self.assertEqual(consistency, ONE)
retry, consistency = policy.on_unavailable(
query=None, consistency=ONE,
required_replicas=10000, alive_replicas=1, retry_num=0)
self.assertEqual(retry, RetryPolicy.RETHROW)
self.assertEqual(consistency, None)
self.assertEqual(retry, RetryPolicy.RETRY_NEXT_HOST)
self.assertEqual(consistency, ONE)
class FallthroughRetryPolicyTest(unittest.TestCase):