diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 23cc1f11..b9183c94 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -194,7 +194,11 @@ class UserAggregateDescriptor(SignatureDescriptor): """ -class Unavailable(Exception): +class DriverException(Exception): + pass + + +class Unavailable(DriverException): """ There were not enough live replicas to satisfy the requested consistency level, so the coordinator node immediately failed the request without @@ -220,7 +224,7 @@ class Unavailable(Exception): 'alive_replicas': alive_replicas})) -class Timeout(Exception): +class Timeout(DriverException): """ Replicas failed to respond to the coordinator node before timing out. """ @@ -289,7 +293,7 @@ class WriteTimeout(Timeout): self.write_type = write_type -class CoordinationFailure(Exception): +class CoordinationFailure(DriverException): """ Replicas sent a failure to the coordinator. """ @@ -359,7 +363,7 @@ class WriteFailure(CoordinationFailure): self.write_type = write_type -class FunctionFailure(Exception): +class FunctionFailure(DriverException): """ User Defined Function failed during execution """ @@ -386,7 +390,7 @@ class FunctionFailure(Exception): Exception.__init__(self, summary_message) -class AlreadyExists(Exception): +class AlreadyExists(DriverException): """ An attempt was made to create a keyspace or table that already exists. """ @@ -414,7 +418,7 @@ class AlreadyExists(Exception): self.table = table -class InvalidRequest(Exception): +class InvalidRequest(DriverException): """ A query was made that was invalid for some reason, such as trying to set the keyspace for a connection to a nonexistent keyspace. @@ -422,21 +426,21 @@ class InvalidRequest(Exception): pass -class Unauthorized(Exception): +class Unauthorized(DriverException): """ The current user is not authorized to perfom the requested operation. """ pass -class AuthenticationFailed(Exception): +class AuthenticationFailed(DriverException): """ Failed to authenticate. """ pass -class OperationTimedOut(Exception): +class OperationTimedOut(DriverException): """ The operation took longer than the specified (client-side) timeout to complete. This is not an error generated by Cassandra, only @@ -460,7 +464,7 @@ class OperationTimedOut(Exception): Exception.__init__(self, message) -class UnsupportedOperation(Exception): +class UnsupportedOperation(DriverException): """ An attempt was made to use a feature that is not supported by the selected protocol version. See :attr:`Cluster.protocol_version` diff --git a/cassandra/cluster.py b/cassandra/cluster.py index d8c1026e..a9dacff4 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -45,7 +45,7 @@ from itertools import groupby, count from cassandra import (ConsistencyLevel, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, - SchemaTargetType) + SchemaTargetType, DriverException) from cassandra.connection import (ConnectionException, ConnectionShutdown, ConnectionHeartbeat, ProtocolVersionUnsupported) from cassandra.cqltypes import UserType @@ -65,7 +65,7 @@ from cassandra.protocol import (QueryMessage, ResultMessage, from cassandra.metadata import Metadata, protect_name, murmur3 from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy, ExponentialReconnectionPolicy, HostDistance, - RetryPolicy) + RetryPolicy, IdentityTranslator) from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler, HostConnectionPool, HostConnection, NoConnectionsAvailable) @@ -347,6 +347,12 @@ class Cluster(object): :class:`.policies.SimpleConvictionPolicy`. """ + address_translator = IdentityTranslator() + """ + :class:`.policies.AddressTranslator` instance to be used in translating server node addresses + to driver connection addresses. + """ + connect_to_remote_hosts = True """ If left as :const:`True`, hosts that are considered :attr:`~.HostDistance.REMOTE` @@ -481,6 +487,37 @@ class Cluster(object): establishment, options passing, and authentication. """ + @property + def schema_metadata_enabled(self): + """ + Flag indicating whether internal schema metadata is updated. + + When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This + can be used to speed initial connection, and reduce load on client and server during operation. Turning this off + gives away token aware request routing, and programmatic inspection of the metadata model. + """ + return self.control_connection._schema_meta_enabled + + @schema_metadata_enabled.setter + def schema_metadata_enabled(self, enabled): + self.control_connection._schema_meta_enabled = bool(enabled) + + @property + def token_metadata_enabled(self): + """ + Flag indicating whether internal token metadata is updated. + + When disabled, the driver does not query node token information on connect, or on topology change events. This + can be used to speed initial connection, and reduce load on client and server during operation. It is most useful + in large clusters using vnodes, where the token map can be expensive to compute. Turning this off + gives away token aware request routing, and programmatic inspection of the token ring. + """ + return self.control_connection._token_meta_enabled + + @token_metadata_enabled.setter + def token_metadata_enabled(self, enabled): + self.control_connection._token_meta_enabled = bool(enabled) + sessions = None control_connection = None scheduler = None @@ -520,7 +557,10 @@ class Cluster(object): idle_heartbeat_interval=30, schema_event_refresh_window=2, topology_event_refresh_window=10, - connect_timeout=5): + connect_timeout=5, + schema_metadata_enabled=True, + token_metadata_enabled=True, + address_translator=None): """ Any of the mutable Cluster attributes may be set as keyword arguments to the constructor. @@ -529,9 +569,15 @@ class Cluster(object): if isinstance(contact_points, six.string_types): raise TypeError("contact_points should not be a string, it should be a sequence (e.g. list) of strings") + if None in contact_points: + raise ValueError("contact_points should not contain None (it can resolve to localhost)") self.contact_points = contact_points self.port = port + + self.contact_points_resolved = [endpoint[4][0] for a in self.contact_points + for endpoint in socket.getaddrinfo(a, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)] + self.compression = compression self.protocol_version = protocol_version self.auth_provider = auth_provider @@ -539,7 +585,6 @@ class Cluster(object): if load_balancing_policy is not None: if isinstance(load_balancing_policy, type): raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class") - self.load_balancing_policy = load_balancing_policy else: self.load_balancing_policy = default_lbp_factory() @@ -547,13 +592,11 @@ class Cluster(object): if reconnection_policy is not None: if isinstance(reconnection_policy, type): raise TypeError("reconnection_policy should not be a class, it should be an instance of that class") - self.reconnection_policy = reconnection_policy if default_retry_policy is not None: if isinstance(default_retry_policy, type): raise TypeError("default_retry_policy should not be a class, it should be an instance of that class") - self.default_retry_policy = default_retry_policy if conviction_policy_factory is not None: @@ -561,6 +604,11 @@ class Cluster(object): raise ValueError("conviction_policy_factory must be callable") self.conviction_policy_factory = conviction_policy_factory + if address_translator is not None: + if isinstance(address_translator, type): + raise TypeError("address_translator should not be a class, it should be an instance of that class") + self.address_translator = address_translator + if connection_class is not None: self.connection_class = connection_class @@ -619,7 +667,9 @@ class Cluster(object): self.control_connection = ControlConnection( self, self.control_connection_timeout, - self.schema_event_refresh_window, self.topology_event_refresh_window) + self.schema_event_refresh_window, self.topology_event_refresh_window, + schema_metadata_enabled, token_metadata_enabled) + def register_user_type(self, keyspace, user_type, klass): """ @@ -813,7 +863,7 @@ class Cluster(object): log.warning("Downgrading core protocol version from %d to %d for %s", self.protocol_version, new_version, host_addr) self.protocol_version = new_version else: - raise Exception("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION)) + raise DriverException("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION)) def connect(self, keyspace=None): """ @@ -823,14 +873,14 @@ class Cluster(object): """ with self._lock: if self.is_shutdown: - raise Exception("Cluster is already shut down") + raise DriverException("Cluster is already shut down") if not self._is_setup: log.debug("Connecting to cluster, contact points: %s; protocol version: %s", self.contact_points, self.protocol_version) self.connection_class.initialize_reactor() atexit.register(partial(_shutdown_cluster, self)) - for address in self.contact_points: + for address in self.contact_points_resolved: host, new = self.add_host(address, signal=False) if new: host.set_up() @@ -1244,8 +1294,8 @@ class Cluster(object): An Exception is raised if schema refresh fails for any reason. """ - if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait): - raise Exception("Schema metadata was not refreshed. See log for details.") + if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait, force=True): + raise DriverException("Schema metadata was not refreshed. See log for details.") def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None): """ @@ -1255,8 +1305,8 @@ class Cluster(object): See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior """ if not self.control_connection.refresh_schema(target_type=SchemaTargetType.KEYSPACE, keyspace=keyspace, - schema_agreement_wait=max_schema_agreement_wait): - raise Exception("Keyspace metadata was not refreshed. See log for details.") + schema_agreement_wait=max_schema_agreement_wait, force=True): + raise DriverException("Keyspace metadata was not refreshed. See log for details.") def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None): """ @@ -1265,8 +1315,9 @@ class Cluster(object): See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior """ - if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table, schema_agreement_wait=max_schema_agreement_wait): - raise Exception("Table metadata was not refreshed. See log for details.") + if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table, + schema_agreement_wait=max_schema_agreement_wait, force=True): + raise DriverException("Table metadata was not refreshed. See log for details.") def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreement_wait=None): """ @@ -1274,8 +1325,9 @@ class Cluster(object): See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior """ - if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view, schema_agreement_wait=max_schema_agreement_wait): - raise Exception("View metadata was not refreshed. See log for details.") + if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view, + schema_agreement_wait=max_schema_agreement_wait, force=True): + raise DriverException("View metadata was not refreshed. See log for details.") def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_wait=None): """ @@ -1283,8 +1335,9 @@ class Cluster(object): See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior """ - if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type, schema_agreement_wait=max_schema_agreement_wait): - raise Exception("User Type metadata was not refreshed. See log for details.") + if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type, + schema_agreement_wait=max_schema_agreement_wait, force=True): + raise DriverException("User Type metadata was not refreshed. See log for details.") def refresh_user_function_metadata(self, keyspace, function, max_schema_agreement_wait=None): """ @@ -1294,8 +1347,9 @@ class Cluster(object): See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior """ - if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function, schema_agreement_wait=max_schema_agreement_wait): - raise Exception("User Function metadata was not refreshed. See log for details.") + if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function, + schema_agreement_wait=max_schema_agreement_wait, force=True): + raise DriverException("User Function metadata was not refreshed. See log for details.") def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreement_wait=None): """ @@ -1305,8 +1359,9 @@ class Cluster(object): See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior """ - if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate, schema_agreement_wait=max_schema_agreement_wait): - raise Exception("User Aggregate metadata was not refreshed. See log for details.") + if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate, + schema_agreement_wait=max_schema_agreement_wait, force=True): + raise DriverException("User Aggregate metadata was not refreshed. See log for details.") def refresh_nodes(self): """ @@ -1315,10 +1370,12 @@ class Cluster(object): An Exception is raised if node refresh fails for any reason. """ if not self.control_connection.refresh_node_list_and_token_map(): - raise Exception("Node list was not refreshed. See log for details.") + raise DriverException("Node list was not refreshed. See log for details.") def set_meta_refresh_enabled(self, enabled): """ + *Deprecated:* set :attr:`~.Cluster.schema_metadata_enabled` :attr:`~.Cluster.token_metadata_enabled` instead + Sets a flag to enable (True) or disable (False) all metadata refresh queries. This applies to both schema and node topology. @@ -1327,7 +1384,8 @@ class Cluster(object): Meta refresh must be enabled for the driver to become aware of any cluster topology changes or schema updates. """ - self.control_connection.set_meta_refresh_enabled(bool(enabled)) + self.schema_metadata_enabled = enabled + self.token_metadata_enabled = enabled def _prepare_all_queries(self, host): if not self._prepared_statements: @@ -2009,8 +2067,11 @@ class ControlConnection(object): Internal """ - _SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers" - _SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner, release_version, schema_version FROM system.local WHERE key='local'" + _SELECT_PEERS = "SELECT * FROM system.peers" + _SELECT_PEERS_NO_TOKENS = "SELECT peer, data_center, rack, rpc_address, schema_version FROM system.peers" + _SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'" + _SELECT_LOCAL_NO_TOKENS = "SELECT cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'" + _SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers" _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'" @@ -2022,14 +2083,17 @@ class ControlConnection(object): _schema_event_refresh_window = None _topology_event_refresh_window = None - _meta_refresh_enabled = True + _schema_meta_enabled = True + _token_meta_enabled = True # for testing purposes _time = time def __init__(self, cluster, timeout, schema_event_refresh_window, - topology_event_refresh_window): + topology_event_refresh_window, + schema_meta_enabled=True, + token_meta_enabled=True): # use a weak reference to allow the Cluster instance to be GC'ed (and # shutdown) since implementing __del__ disables the cycle detector self._cluster = weakref.proxy(cluster) @@ -2038,6 +2102,8 @@ class ControlConnection(object): self._schema_event_refresh_window = schema_event_refresh_window self._topology_event_refresh_window = topology_event_refresh_window + self._schema_meta_enabled = schema_meta_enabled + self._token_meta_enabled = token_meta_enabled self._lock = RLock() self._schema_agreement_lock = Lock() @@ -2119,8 +2185,10 @@ class ControlConnection(object): "SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change') }, register_timeout=self._timeout) - peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=ConsistencyLevel.ONE) - local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=ConsistencyLevel.ONE) + sel_peers = self._SELECT_PEERS if self._token_meta_enabled else self._SELECT_PEERS_NO_TOKENS + sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS + peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE) + local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE) shared_results = connection.wait_for_responses( peers_query, local_query, timeout=self._timeout) @@ -2200,14 +2268,10 @@ class ControlConnection(object): self._connection.close() del self._connection - def refresh_schema(self, **kwargs): - if not self._meta_refresh_enabled: - log.debug("[control connection] Skipping schema refresh because meta refresh is disabled") - return False - + def refresh_schema(self, force=False, **kwargs): try: if self._connection: - return self._refresh_schema(self._connection, **kwargs) + return self._refresh_schema(self._connection, force=force, **kwargs) except ReferenceError: pass # our weak reference to the Cluster is no good except Exception: @@ -2215,13 +2279,18 @@ class ControlConnection(object): self._signal_error() return False - def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, **kwargs): + def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, force=False, **kwargs): if self._cluster.is_shutdown: return False agreed = self.wait_for_schema_agreement(connection, preloaded_results=preloaded_results, wait_time=schema_agreement_wait) + + if not self._schema_meta_enabled and not force: + log.debug("[control connection] Skipping schema refresh because schema metadata is disabled") + return False + if not agreed: log.debug("Skipping schema refresh due to lack of schema agreement") return False @@ -2231,10 +2300,6 @@ class ControlConnection(object): return True def refresh_node_list_and_token_map(self, force_token_rebuild=False): - if not self._meta_refresh_enabled: - log.debug("[control connection] Skipping node list refresh because meta refresh is disabled") - return False - try: if self._connection: self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild) @@ -2254,10 +2319,17 @@ class ControlConnection(object): peers_result = preloaded_results[0] local_result = preloaded_results[1] else: - log.debug("[control connection] Refreshing node list and token map") cl = ConsistencyLevel.ONE - peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=cl) - local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=cl) + if not self._token_meta_enabled: + log.debug("[control connection] Refreshing node list without token map") + sel_peers = self._SELECT_PEERS_NO_TOKENS + sel_local = self._SELECT_LOCAL_NO_TOKENS + else: + log.debug("[control connection] Refreshing node list and token map") + sel_peers = self._SELECT_PEERS + sel_local = self._SELECT_LOCAL + peers_query = QueryMessage(query=sel_peers, consistency_level=cl) + local_query = QueryMessage(query=sel_local, consistency_level=cl) peers_result, local_result = connection.wait_for_responses( peers_query, local_query, timeout=self._timeout) @@ -2277,6 +2349,8 @@ class ControlConnection(object): datacenter = local_row.get("data_center") rack = local_row.get("rack") self._update_location_info(host, datacenter, rack) + host.listen_address = local_row.get("listen_address") + host.broadcast_address = local_row.get("broadcast_address") partitioner = local_row.get("partitioner") tokens = local_row.get("tokens") @@ -2291,13 +2365,10 @@ class ControlConnection(object): should_rebuild_token_map = force_token_rebuild or self._cluster.metadata.partitioner is None found_hosts = set() for row in peers_result: - addr = row.get("rpc_address") + addr = self._rpc_from_peer_row(row) - if not addr or addr in ["0.0.0.0", "::"]: - addr = row.get("peer") - - tokens = row.get("tokens") - if not tokens: + tokens = row.get("tokens", None) + if 'tokens' in row and not tokens: # it was selected, but empty log.warning("Excluding host (%s) with no tokens in system.peers table of %s." % (addr, connection.host)) continue @@ -2313,6 +2384,8 @@ class ControlConnection(object): else: should_rebuild_token_map |= self._update_location_info(host, datacenter, rack) + host.broadcast_address = row.get("peer") + if partitioner and tokens: token_map[host] = tokens @@ -2320,7 +2393,7 @@ class ControlConnection(object): if old_host.address != connection.host and old_host.address not in found_hosts: should_rebuild_token_map = True if old_host.address not in self._cluster.contact_points: - log.debug("[control connection] Found host that has been removed: %r", old_host) + log.debug("[control connection] Removing host not found in peers metadata: %r", old_host) self._cluster.remove_host(old_host) log.debug("[control connection] Finished fetching ring info") @@ -2356,7 +2429,7 @@ class ControlConnection(object): def _handle_topology_change(self, event): change_type = event["change_type"] - addr, port = event["address"] + addr = self._translate_address(event["address"][0]) if change_type == "NEW_NODE" or change_type == "MOVED_NODE": if self._topology_event_refresh_window >= 0: delay = self._delay_for_event_type('topology_change', self._topology_event_refresh_window) @@ -2367,7 +2440,7 @@ class ControlConnection(object): def _handle_status_change(self, event): change_type = event["change_type"] - addr, port = event["address"] + addr = self._translate_address(event["address"][0]) host = self._cluster.metadata.get_host(addr) if change_type == "UP": delay = 1 + self._delay_for_event_type('status_change', 0.5) # randomness to avoid thundering herd problem on events @@ -2385,6 +2458,9 @@ class ControlConnection(object): # this will be run by the scheduler self._cluster.on_down(host, is_host_addition=False) + def _translate_address(self, addr): + return self._cluster.address_translator.translate(addr) + def _handle_schema_change(self, event): if self._schema_event_refresh_window < 0: return @@ -2466,11 +2542,7 @@ class ControlConnection(object): schema_ver = row.get('schema_version') if not schema_ver: continue - - addr = row.get("rpc_address") - if not addr or addr in ["0.0.0.0", "::"]: - addr = row.get("peer") - + addr = self._rpc_from_peer_row(row) peer = self._cluster.metadata.get_host(addr) if peer and peer.is_up: versions[schema_ver].add(addr) @@ -2481,6 +2553,12 @@ class ControlConnection(object): return dict((version, list(nodes)) for version, nodes in six.iteritems(versions)) + def _rpc_from_peer_row(self, row): + addr = row.get("rpc_address") + if not addr or addr in ["0.0.0.0", "::"]: + addr = row.get("peer") + return self._translate_address(addr) + def _signal_error(self): with self._lock: if self._is_shutdown: @@ -2529,9 +2607,6 @@ class ControlConnection(object): if connection is self._connection and (connection.is_defunct or connection.is_closed): self.reconnect() - def set_meta_refresh_enabled(self, enabled): - self._meta_refresh_enabled = enabled - def _stop_scheduler(scheduler, thread): try: @@ -2626,13 +2701,9 @@ class _Scheduler(object): def refresh_schema_and_set_result(control_conn, response_future, **kwargs): try: - if control_conn._meta_refresh_enabled: - log.debug("Refreshing schema in response to schema change. " - "%s", kwargs) - response_future.is_schema_agreed = control_conn._refresh_schema(response_future._connection, **kwargs) - else: - log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; " - "%s", kwargs) + log.debug("Refreshing schema in response to schema change. " + "%s", kwargs) + response_future.is_schema_agreed = control_conn._refresh_schema(response_future._connection, **kwargs) except Exception: log.exception("Exception refreshing schema in response to schema change:") response_future.session.submit(control_conn.refresh_schema, **kwargs) @@ -2717,7 +2788,16 @@ class ResponseFuture(object): self._timer.cancel() def _on_timeout(self): - self._set_final_exception(OperationTimedOut(self._errors, self._current_host)) + errors = self._errors + if not errors: + if self.is_schema_agreed: + errors = {self._current_host.address: "Client request timeout. See Session.execute[_async](timeout)"} + else: + connection = getattr(self.session.cluster.control_connection, '_connection') + host = connection.host if connection else 'unknown' + errors = {host: "Request timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait."} + + self._set_final_exception(OperationTimedOut(errors, self._current_host)) def _make_query_plan(self): # convert the list/generator/etc to an iterator so that subsequent @@ -2809,7 +2889,7 @@ class ResponseFuture(object): """ # TODO: When timers are introduced, just make this wait if not self._event.is_set(): - raise Exception("warnings cannot be retrieved before ResponseFuture is finalized") + raise DriverException("warnings cannot be retrieved before ResponseFuture is finalized") return self._warnings @property @@ -2827,7 +2907,7 @@ class ResponseFuture(object): """ # TODO: When timers are introduced, just make this wait if not self._event.is_set(): - raise Exception("custom_payload cannot be retrieved before ResponseFuture is finalized") + raise DriverException("custom_payload cannot be retrieved before ResponseFuture is finalized") return self._custom_payload def start_fetching_next_page(self): @@ -2982,15 +3062,17 @@ class ResponseFuture(object): return retry_type, consistency = retry - if retry_type is RetryPolicy.RETRY: + if retry_type in (RetryPolicy.RETRY, RetryPolicy.RETRY_NEXT_HOST): self._query_retries += 1 - self._retry(reuse_connection=True, consistency_level=consistency) + reuse = retry_type == RetryPolicy.RETRY + self._retry(reuse_connection=reuse, consistency_level=consistency) elif retry_type is RetryPolicy.RETHROW: self._set_final_exception(response.to_exception()) else: # IGNORE if self._metrics is not None: self._metrics.on_ignore() self._set_final_result(None) + self._errors[self._current_host] = response.to_exception() elif isinstance(response, ConnectionException): if self._metrics is not None: self._metrics.on_connection_error() diff --git a/cassandra/connection.py b/cassandra/connection.py index 0624ad13..f292f4fa 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -125,6 +125,7 @@ class _Frame(object): NONBLOCKING = (errno.EAGAIN, errno.EWOULDBLOCK) + class ConnectionException(Exception): """ An unrecoverable error was hit when attempting to use a connection, @@ -319,12 +320,14 @@ class Connection(object): def _connect_socket(self): sockerr = None addresses = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM) + if not addresses: + raise ConnectionException("getaddrinfo returned empty list for %s" % (self.host,)) for (af, socktype, proto, canonname, sockaddr) in addresses: try: self._socket = self._socket_impl.socket(af, socktype, proto) if self.ssl_options: if not self._ssl_impl: - raise Exception("This version of Python was not compiled with SSL support") + raise RuntimeError("This version of Python was not compiled with SSL support") self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options) self._socket.settimeout(self.connect_timeout) self._socket.connect(sockaddr) diff --git a/cassandra/io/geventreactor.py b/cassandra/io/geventreactor.py index 6e62a38b..c825a8c1 100644 --- a/cassandra/io/geventreactor.py +++ b/cassandra/io/geventreactor.py @@ -14,17 +14,16 @@ import gevent import gevent.event from gevent.queue import Queue -from gevent import select, socket +from gevent import socket import gevent.ssl -from functools import partial import logging import os import time from six.moves import range -from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, EINVAL +from errno import EINVAL from cassandra.connection import Connection, ConnectionShutdown, Timer, TimerManager @@ -34,9 +33,8 @@ log = logging.getLogger(__name__) def is_timeout(err): return ( - err in (EINPROGRESS, EALREADY, EWOULDBLOCK) or (err == EINVAL and os.name in ('nt', 'ce')) or - isinstance(err, socket.timeout) + isinstance(err, socket.timeout) ) @@ -118,44 +116,23 @@ class GeventConnection(Connection): self.close() def handle_write(self): - run_select = partial(select.select, (), (self._socket,), ()) while True: try: next_msg = self._write_queue.get() - run_select() - except Exception as exc: - if not self.is_closed: - log.debug("Exception during write select() for %s: %s", self, exc) - self.defunct(exc) - return - - try: self._socket.sendall(next_msg) except socket.error as err: - log.debug("Exception during socket sendall for %s: %s", self, err) + log.debug("Exception in send for %s: %s", self, err) self.defunct(err) - return # Leave the write loop - - def handle_read(self): - run_select = partial(select.select, (self._socket,), (), ()) - while True: - try: - run_select() - except Exception as exc: - if not self.is_closed: - log.debug("Exception during read select() for %s: %s", self, exc) - self.defunct(exc) return + def handle_read(self): + while True: try: - while True: - buf = self._socket.recv(self.in_buffer_size) - self._iobuf.write(buf) - if len(buf) < self.in_buffer_size: - break + buf = self._socket.recv(self.in_buffer_size) + self._iobuf.write(buf) except socket.error as err: if not is_timeout(err): - log.debug("Exception during socket recv for %s: %s", self, err) + log.debug("Exception in read for %s: %s", self, err) self.defunct(err) return # leave the read loop diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 1d04b4c9..28e19dec 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -1402,8 +1402,10 @@ class TokenMap(object): with self._rebuild_lock: current = self.tokens_to_hosts_by_ks.get(keyspace, None) if (build_if_absent and current is None) or (not build_if_absent and current is not None): - replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace]) - self.tokens_to_hosts_by_ks[keyspace] = replica_map + ks_meta = self._metadata.keyspaces.get(keyspace) + if ks_meta: + replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace]) + self.tokens_to_hosts_by_ks[keyspace] = replica_map def replica_map_for_keyspace(self, ks_metadata): strategy = ks_metadata.replication_strategy diff --git a/cassandra/policies.py b/cassandra/policies.py index 595717ca..8eb97e20 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -16,12 +16,9 @@ from itertools import islice, cycle, groupby, repeat import logging from random import randint from threading import Lock -import six from cassandra import ConsistencyLevel -from six.moves import range - log = logging.getLogger(__name__) @@ -235,7 +232,7 @@ class DCAwareRoundRobinPolicy(LoadBalancingPolicy): self._dc_live_hosts[dc] = tuple(set(dc_hosts)) if not self.local_dc: - self._contact_points = cluster.contact_points + self._contact_points = cluster.contact_points_resolved self._position = randint(0, len(hosts) - 1) if hosts else 0 @@ -337,7 +334,7 @@ class TokenAwarePolicy(LoadBalancingPolicy): def check_supported(self): if not self._cluster_metadata.can_support_partitioner(): - raise Exception( + raise RuntimeError( '%s cannot be used with the cluster partitioner (%s) because ' 'the relevant C extension for this driver was not compiled. ' 'See the installation instructions for details on building ' @@ -554,8 +551,8 @@ class ExponentialReconnectionPolicy(ReconnectionPolicy): self.max_attempts = max_attempts def new_schedule(self): - i=0 - while self.max_attempts == None or i < self.max_attempts: + i = 0 + while self.max_attempts is None or i < self.max_attempts: yield min(self.base_delay * (2 ** i), self.max_delay) i += 1 @@ -650,6 +647,12 @@ class RetryPolicy(object): should be ignored but no more retries should be attempted. """ + RETRY_NEXT_HOST = 3 + """ + This should be returned from the below methods if the operation + should be retried on another connection. + """ + def on_read_timeout(self, query, consistency, required_responses, received_responses, data_retrieved, retry_num): """ @@ -677,11 +680,11 @@ class RetryPolicy(object): a sufficient number of replicas responded (with data digests). """ if retry_num != 0: - return (self.RETHROW, None) + return self.RETHROW, None elif received_responses >= required_responses and not data_retrieved: - return (self.RETRY, consistency) + return self.RETRY, consistency else: - return (self.RETHROW, None) + return self.RETHROW, None def on_write_timeout(self, query, consistency, write_type, required_responses, received_responses, retry_num): @@ -710,11 +713,11 @@ class RetryPolicy(object): :attr:`~.WriteType.BATCH_LOG`. """ if retry_num != 0: - return (self.RETHROW, None) + return self.RETHROW, None elif write_type == WriteType.BATCH_LOG: - return (self.RETRY, consistency) + return self.RETRY, consistency else: - return (self.RETHROW, None) + return self.RETHROW, None def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num): """ @@ -739,7 +742,7 @@ class RetryPolicy(object): By default, no retries will be attempted and the error will be re-raised. """ - return (self.RETHROW, None) + return (self.RETRY_NEXT_HOST, consistency) if retry_num == 0 else (self.RETHROW, None) class FallthroughRetryPolicy(RetryPolicy): @@ -749,13 +752,13 @@ class FallthroughRetryPolicy(RetryPolicy): """ def on_read_timeout(self, *args, **kwargs): - return (self.RETHROW, None) + return self.RETHROW, None def on_write_timeout(self, *args, **kwargs): - return (self.RETHROW, None) + return self.RETHROW, None def on_unavailable(self, *args, **kwargs): - return (self.RETHROW, None) + return self.RETHROW, None class DowngradingConsistencyRetryPolicy(RetryPolicy): @@ -807,45 +810,73 @@ class DowngradingConsistencyRetryPolicy(RetryPolicy): """ def _pick_consistency(self, num_responses): if num_responses >= 3: - return (self.RETRY, ConsistencyLevel.THREE) + return self.RETRY, ConsistencyLevel.THREE elif num_responses >= 2: - return (self.RETRY, ConsistencyLevel.TWO) + return self.RETRY, ConsistencyLevel.TWO elif num_responses >= 1: - return (self.RETRY, ConsistencyLevel.ONE) + return self.RETRY, ConsistencyLevel.ONE else: - return (self.RETHROW, None) + return self.RETHROW, None def on_read_timeout(self, query, consistency, required_responses, received_responses, data_retrieved, retry_num): if retry_num != 0: - return (self.RETHROW, None) + return self.RETHROW, None elif received_responses < required_responses: return self._pick_consistency(received_responses) elif not data_retrieved: - return (self.RETRY, consistency) + return self.RETRY, consistency else: - return (self.RETHROW, None) + return self.RETHROW, None def on_write_timeout(self, query, consistency, write_type, required_responses, received_responses, retry_num): if retry_num != 0: - return (self.RETHROW, None) + return self.RETHROW, None if write_type in (WriteType.SIMPLE, WriteType.BATCH, WriteType.COUNTER): if received_responses > 0: # persisted on at least one replica - return (self.IGNORE, None) + return self.IGNORE, None else: - return (self.RETHROW, None) + return self.RETHROW, None elif write_type == WriteType.UNLOGGED_BATCH: return self._pick_consistency(received_responses) elif write_type == WriteType.BATCH_LOG: - return (self.RETRY, consistency) + return self.RETRY, consistency - return (self.RETHROW, None) + return self.RETHROW, None def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num): if retry_num != 0: - return (self.RETHROW, None) + return self.RETHROW, None else: return self._pick_consistency(alive_replicas) + + +class AddressTranslator(object): + """ + Interface for translating cluster-defined endpoints. + + The driver discovers nodes using server metadata and topology change events. Normally, + the endpoint defined by the server is the right way to connect to a node. In some environments, + these addresses may not be reachable, or not preferred (public vs. private IPs in cloud environments, + suboptimal routing, etc). This interface allows for translating from server defined endpoints to + preferred addresses for driver connections. + + *Note:* :attr:`~Cluster.contact_points` provided while creating the :class:`~.Cluster` instance are not + translated using this mechanism -- only addresses received from Cassandra nodes are. + """ + def translate(self, addr): + """ + Accepts the node ip address, and returns a translated address to be used connecting to this node. + """ + raise NotImplementedError + + +class IdentityTranslator(AddressTranslator): + """ + Returns the endpoint with no translation + """ + def translate(self, addr): + return addr diff --git a/cassandra/pool.py b/cassandra/pool.py index 08cfd372..3cb14e91 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -48,7 +48,21 @@ class Host(object): address = None """ - The IP address or hostname of the node. + The IP address of the node. This is the RPC address the driver uses when connecting to the node + """ + + broadcast_address = None + """ + broadcast address configured for the node, *if available* ('peer' in system.peers table). + This is not present in the ``system.local`` table for older versions of Cassandra. It is also not queried if + :attr:`~.Cluster.token_metadata_enabled` is ``False``. + """ + + listen_address = None + """ + listen address configured for the node, *if available*. This is only available in the ``system.local`` table for newer + versions of Cassandra. It is also not queried if :attr:`~.Cluster.token_metadata_enabled` is ``False``. + Usually the same as ``broadcast_address`` unless configured differently in cassandra.yaml. """ conviction_policy = None diff --git a/cassandra/protocol.py b/cassandra/protocol.py index d2b94673..ede6cc58 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -22,7 +22,7 @@ import six from six.moves import range import io -from cassandra import type_codes +from cassandra import type_codes, DriverException from cassandra import (Unavailable, WriteTimeout, ReadTimeout, WriteFailure, ReadFailure, FunctionFailure, AlreadyExists, InvalidRequest, Unauthorized, @@ -589,7 +589,7 @@ class ResultMessage(_MessageType): elif kind == RESULT_KIND_SCHEMA_CHANGE: results = cls.recv_results_schema_change(f, protocol_version) else: - raise Exception("Unknown RESULT kind: %d" % kind) + raise DriverException("Unknown RESULT kind: %d" % kind) return cls(kind, results, paging_state) @classmethod @@ -971,7 +971,7 @@ class _ProtocolHandler(object): """ if flags & COMPRESSED_FLAG: if decompressor is None: - raise Exception("No de-compressor available for compressed frame!") + raise RuntimeError("No de-compressor available for compressed frame!") body = decompressor(body) flags ^= COMPRESSED_FLAG diff --git a/cassandra/util.py b/cassandra/util.py index ab6968e0..c4837af4 100644 --- a/cassandra/util.py +++ b/cassandra/util.py @@ -1101,7 +1101,7 @@ else: WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA else: def not_windows(*args): - raise Exception("IPv6 addresses cannot be handled on Windows. " + raise OSError("IPv6 addresses cannot be handled on Windows. " "Missing ctypes.windll") WSAStringToAddressA = not_windows WSAAddressToStringA = not_windows diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index 9c546d3b..fdea3e67 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -25,6 +25,8 @@ .. autoattribute:: conviction_policy_factory + .. autoattribute:: address_translator + .. autoattribute:: connection_class .. autoattribute:: metrics_enabled @@ -49,6 +51,12 @@ .. autoattribute:: connect_timeout + .. autoattribute:: schema_metadata_enabled + :annotation: = True + + .. autoattribute:: token_metadata_enabled + :annotation: = True + .. automethod:: connect .. automethod:: shutdown diff --git a/docs/api/cassandra/policies.rst b/docs/api/cassandra/policies.rst index 44346c4b..c96e491a 100644 --- a/docs/api/cassandra/policies.rst +++ b/docs/api/cassandra/policies.rst @@ -24,6 +24,15 @@ Load Balancing .. autoclass:: TokenAwarePolicy :members: +Translating Server Node Addresses +--------------------------------- + +.. autoclass:: AddressTranslator + :members: + +.. autoclass:: IdentityTranslator + :members: + Marking Hosts Up or Down ------------------------ diff --git a/tests/integration/standard/test_metrics.py b/tests/integration/standard/test_metrics.py index 48627b3f..13758b65 100644 --- a/tests/integration/standard/test_metrics.py +++ b/tests/integration/standard/test_metrics.py @@ -145,13 +145,13 @@ class MetricsTests(unittest.TestCase): query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL) with self.assertRaises(Unavailable): self.session.execute(query) - self.assertEqual(1, self.cluster.metrics.stats.unavailables) + self.assertEqual(2, self.cluster.metrics.stats.unavailables) # Test write query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL) with self.assertRaises(Unavailable): self.session.execute(query, timeout=None) - self.assertEqual(2, self.cluster.metrics.stats.unavailables) + self.assertEqual(4, self.cluster.metrics.stats.unavailables) finally: get_node(1).start(wait_other_notice=True, wait_for_binary_proto=True) # Give some time for the cluster to come back up, for the next test diff --git a/tests/unit/test_control_connection.py b/tests/unit/test_control_connection.py index 9fac7e2f..e109a76f 100644 --- a/tests/unit/test_control_connection.py +++ b/tests/unit/test_control_connection.py @@ -25,7 +25,7 @@ from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS from cassandra.cluster import ControlConnection, _Scheduler from cassandra.pool import Host from cassandra.policies import (SimpleConvictionPolicy, RoundRobinPolicy, - ConstantReconnectionPolicy) + ConstantReconnectionPolicy, IdentityTranslator) PEER_IP = "foobar" @@ -61,6 +61,7 @@ class MockCluster(object): max_schema_agreement_wait = 5 load_balancing_policy = RoundRobinPolicy() reconnection_policy = ConstantReconnectionPolicy(2) + address_translator = IdentityTranslator() down_host = None contact_points = [] is_shutdown = False diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index a9406cf7..6640ccf1 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -483,7 +483,7 @@ class DCAwareRoundRobinPolicyTest(unittest.TestCase): host_none = Host(1, SimpleConvictionPolicy) # contact point is '1' - cluster = Mock(contact_points=[1]) + cluster = Mock(contact_points_resolved=[1]) # contact DC first policy = DCAwareRoundRobinPolicy() @@ -916,14 +916,14 @@ class RetryPolicyTest(unittest.TestCase): retry, consistency = policy.on_unavailable( query=None, consistency=ONE, required_replicas=1, alive_replicas=2, retry_num=0) - self.assertEqual(retry, RetryPolicy.RETHROW) - self.assertEqual(consistency, None) + self.assertEqual(retry, RetryPolicy.RETRY_NEXT_HOST) + self.assertEqual(consistency, ONE) retry, consistency = policy.on_unavailable( query=None, consistency=ONE, required_replicas=10000, alive_replicas=1, retry_num=0) - self.assertEqual(retry, RetryPolicy.RETHROW) - self.assertEqual(consistency, None) + self.assertEqual(retry, RetryPolicy.RETRY_NEXT_HOST) + self.assertEqual(consistency, ONE) class FallthroughRetryPolicyTest(unittest.TestCase):