Simpler host connections for v3 protocol
This commit is contained in:
@@ -61,7 +61,8 @@ from cassandra.policies import (RoundRobinPolicy, SimpleConvictionPolicy,
|
||||
ExponentialReconnectionPolicy, HostDistance,
|
||||
RetryPolicy)
|
||||
from cassandra.pool import (_ReconnectionHandler, _HostReconnectionHandler,
|
||||
HostConnectionPool, NoConnectionsAvailable)
|
||||
HostConnectionPool, HostConnection,
|
||||
NoConnectionsAvailable)
|
||||
from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement,
|
||||
BatchStatement, bind_params, QueryTrace, Statement,
|
||||
named_tuple_factory, dict_factory)
|
||||
@@ -262,6 +263,15 @@ class Cluster(object):
|
||||
:class:`.policies.SimpleConvictionPolicy`.
|
||||
"""
|
||||
|
||||
connect_to_remote_hosts = True
|
||||
"""
|
||||
If left as :const:`True`, hosts that are considered :attr:`~.HostDistance.REMOTE`
|
||||
by the :attr:`~.Cluster.load_balancing_policy` will have a connection
|
||||
opened to them. Otherwise, they will not have a connection opened to them.
|
||||
|
||||
.. versionadded:: 2.1.0
|
||||
"""
|
||||
|
||||
metrics_enabled = False
|
||||
"""
|
||||
Whether or not metric collection is enabled. If enabled, :attr:`.metrics`
|
||||
@@ -465,12 +475,20 @@ class Cluster(object):
|
||||
return self._min_requests_per_connection[host_distance]
|
||||
|
||||
def set_min_requests_per_connection(self, host_distance, min_requests):
|
||||
if self.protocol_version >= 3:
|
||||
raise UnsupportedOperation(
|
||||
"Cluster.set_min_requests_per_connection() only has an effect "
|
||||
"when using protocol_version 1 or 2.")
|
||||
self._min_requests_per_connection[host_distance] = min_requests
|
||||
|
||||
def get_max_requests_per_connection(self, host_distance):
|
||||
return self._max_requests_per_connection[host_distance]
|
||||
|
||||
def set_max_requests_per_connection(self, host_distance, max_requests):
|
||||
if self.protocol_version >= 3:
|
||||
raise UnsupportedOperation(
|
||||
"Cluster.set_max_requests_per_connection() only has an effect "
|
||||
"when using protocol_version 1 or 2.")
|
||||
self._max_requests_per_connection[host_distance] = max_requests
|
||||
|
||||
def get_core_connections_per_host(self, host_distance):
|
||||
@@ -479,6 +497,9 @@ class Cluster(object):
|
||||
for each host with :class:`~.HostDistance` equal to `host_distance`.
|
||||
The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
|
||||
:attr:`~HostDistance.REMOTE`.
|
||||
|
||||
This property is ignored if :attr:`~.Cluster.protocol_version` is
|
||||
3 or higher.
|
||||
"""
|
||||
return self._core_connections_per_host[host_distance]
|
||||
|
||||
@@ -488,7 +509,16 @@ class Cluster(object):
|
||||
for each host with :class:`~.HostDistance` equal to `host_distance`.
|
||||
The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
|
||||
:attr:`~HostDistance.REMOTE`.
|
||||
|
||||
If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
|
||||
is not supported (there is always one connection per host, unless
|
||||
the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
|
||||
and using this will result in an :exc:`~.UnsupporteOperation`.
|
||||
"""
|
||||
if self.protocol_version >= 3:
|
||||
raise UnsupportedOperation(
|
||||
"Cluster.set_core_connections_per_host() only has an effect "
|
||||
"when using protocol_version 1 or 2.")
|
||||
old = self._core_connections_per_host[host_distance]
|
||||
self._core_connections_per_host[host_distance] = core_connections
|
||||
if old < core_connections:
|
||||
@@ -500,6 +530,9 @@ class Cluster(object):
|
||||
for each host with :class:`~.HostDistance` equal to `host_distance`.
|
||||
The default is 8 for :attr:`~HostDistance.LOCAL` and 2 for
|
||||
:attr:`~HostDistance.REMOTE`.
|
||||
|
||||
This property is ignored if :attr:`~.Cluster.protocol_version` is
|
||||
3 or higher.
|
||||
"""
|
||||
return self._max_connections_per_host[host_distance]
|
||||
|
||||
@@ -509,7 +542,16 @@ class Cluster(object):
|
||||
for each host with :class:`~.HostDistance` equal to `host_distance`.
|
||||
The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
|
||||
:attr:`~HostDistance.REMOTE`.
|
||||
|
||||
If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
|
||||
is not supported (there is always one connection per host, unless
|
||||
the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
|
||||
and using this will result in an :exc:`~.UnsupporteOperation`.
|
||||
"""
|
||||
if self.protocol_version >= 3:
|
||||
raise UnsupportedOperation(
|
||||
"Cluster.set_max_connections_per_host() only has an effect "
|
||||
"when using protocol_version 1 or 2.")
|
||||
self._max_connections_per_host[host_distance] = max_connections
|
||||
|
||||
def connection_factory(self, address, *args, **kwargs):
|
||||
@@ -1341,14 +1383,17 @@ class Session(object):
|
||||
|
||||
def run_add_or_renew_pool():
|
||||
try:
|
||||
new_pool = HostConnectionPool(host, distance, self)
|
||||
if self._protocol_version >= 3:
|
||||
new_pool = HostConnection(host, distance, self)
|
||||
else:
|
||||
new_pool = HostConnectionPool(host, distance, self)
|
||||
except AuthenticationFailed as auth_exc:
|
||||
conn_exc = ConnectionException(str(auth_exc), host=host)
|
||||
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
|
||||
return False
|
||||
except Exception as conn_exc:
|
||||
log.warning("Failed to create connection pool for new host %s: %s",
|
||||
host, conn_exc)
|
||||
log.warning("Failed to create connection pool for new host %s:",
|
||||
host, exc_info=conn_exc)
|
||||
# the host itself will still be marked down, so we need to pass
|
||||
# a special flag to make sure the reconnector is created
|
||||
self.cluster.signal_connection_failure(
|
||||
|
@@ -20,7 +20,7 @@ import logging
|
||||
import re
|
||||
import socket
|
||||
import time
|
||||
from threading import RLock, Condition
|
||||
from threading import Lock, RLock, Condition
|
||||
import weakref
|
||||
try:
|
||||
from weakref import WeakSet
|
||||
@@ -29,6 +29,7 @@ except ImportError:
|
||||
|
||||
from cassandra import AuthenticationFailed
|
||||
from cassandra.connection import ConnectionException
|
||||
from cassandra.policies import HostDistance
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -283,11 +284,117 @@ class _HostReconnectionHandler(_ReconnectionHandler):
|
||||
return True
|
||||
|
||||
|
||||
class HostConnection(object):
|
||||
"""
|
||||
When using v3 of the native protocol, this is used instead of a connection
|
||||
pool per host (HostConnectionPool) due to the increased in-flight capacity
|
||||
of individual connections.
|
||||
"""
|
||||
|
||||
host = None
|
||||
host_distance = None
|
||||
is_shutdown = False
|
||||
|
||||
_session = None
|
||||
_connection = None
|
||||
_lock = None
|
||||
|
||||
def __init__(self, host, host_distance, session):
|
||||
self.host = host
|
||||
self.host_distance = host_distance
|
||||
self._session = weakref.proxy(session)
|
||||
self._lock = Lock()
|
||||
|
||||
if host_distance == HostDistance.IGNORED:
|
||||
log.debug("Not opening connection to ignored host %s", self.host)
|
||||
return
|
||||
elif host_distance == HostDistance.REMOTE and not session.cluster.connect_to_remote_hosts:
|
||||
log.debug("Not opening connection to remote host %s", self.host)
|
||||
return
|
||||
|
||||
log.debug("Initializing connection for host %s", self.host)
|
||||
self._connection = session.cluster.connection_factory(host.address)
|
||||
if session.keyspace:
|
||||
self._connection.set_keyspace_blocking(session.keyspace)
|
||||
log.debug("Finished initializing connection for host %s", self.host)
|
||||
|
||||
def borrow_connection(self, timeout):
|
||||
if self.is_shutdown:
|
||||
raise ConnectionException(
|
||||
"Pool for %s is shutdown" % (self.host,), self.host)
|
||||
|
||||
conn = self._connection
|
||||
if not conn:
|
||||
raise NoConnectionsAvailable()
|
||||
|
||||
with conn.lock:
|
||||
if conn.in_flight > conn.max_request_id:
|
||||
raise NoConnectionsAvailable("All request IDs are currently in use")
|
||||
conn.in_flight += 1
|
||||
return conn, conn.get_request_id()
|
||||
|
||||
def return_connection(self, connection):
|
||||
with connection.lock:
|
||||
connection.in_flight -= 1
|
||||
|
||||
if connection.is_defunct or connection.is_closed:
|
||||
log.debug("Defunct or closed connection (%s) returned to pool, potentially "
|
||||
"marking host %s as down", id(connection), self.host)
|
||||
is_down = self._session.cluster.signal_connection_failure(
|
||||
self.host, connection.last_error, is_host_addition=False)
|
||||
if is_down:
|
||||
self.shutdown()
|
||||
else:
|
||||
self._connection = None
|
||||
with self._lock:
|
||||
if self._is_replacing:
|
||||
return
|
||||
self._is_replacing = True
|
||||
self._session.submit(self._replace, connection)
|
||||
|
||||
def _replace(self, connection):
|
||||
log.debug("Replacing connection (%s) to %s", id(connection), self.host)
|
||||
conn = self._session.cluster.connection_factory(self.host.address)
|
||||
if self._session.keyspace:
|
||||
conn.set_keyspace_blocking(self._session.keyspace)
|
||||
self._connection = conn
|
||||
with self._lock:
|
||||
self._is_replacing = False
|
||||
|
||||
def shutdown(self):
|
||||
with self._lock:
|
||||
if self.is_shutdown:
|
||||
return
|
||||
else:
|
||||
self.is_shutdown = True
|
||||
|
||||
if self._connection:
|
||||
self._connection.close()
|
||||
|
||||
def _set_keyspace_for_all_conns(self, keyspace, callback):
|
||||
if self.is_shutdown or not self._connection:
|
||||
return
|
||||
|
||||
def connection_finished_setting_keyspace(conn, error):
|
||||
errors = [] if not error else [error]
|
||||
callback(self, errors)
|
||||
|
||||
self._connection.set_keyspace_async(keyspace, connection_finished_setting_keyspace)
|
||||
|
||||
def get_state(self):
|
||||
have_conn = self._connection is not None
|
||||
in_flight = self._connection.in_flight if have_conn else 0
|
||||
return "shutdown: %s, open: %s, in_flights: %s" % (self.is_shutdown, have_conn, in_flight)
|
||||
|
||||
|
||||
_MAX_SIMULTANEOUS_CREATION = 1
|
||||
_MIN_TRASH_INTERVAL = 10
|
||||
|
||||
|
||||
class HostConnectionPool(object):
|
||||
"""
|
||||
Used to pool connections to a host for v1 and v2 native protocol.
|
||||
"""
|
||||
|
||||
host = None
|
||||
host_distance = None
|
||||
|
Reference in New Issue
Block a user