Cluster._make_connection_kwargs accept overrides for internal defaults

Make CC use protocol.ProtocolHandler for internal connections, to avoid
specialized implementations.
This commit is contained in:
Adam Holmberg
2015-06-23 11:36:50 -05:00
parent 22b92df9f9
commit 0acb459da2

View File

@@ -60,7 +60,8 @@ from cassandra.protocol import (QueryMessage, ResultMessage,
IsBootstrappingErrorMessage, IsBootstrappingErrorMessage,
BatchMessage, RESULT_KIND_PREPARED, BatchMessage, RESULT_KIND_PREPARED,
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS, RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
RESULT_KIND_SCHEMA_CHANGE, MIN_SUPPORTED_VERSION, ProtocolHandler) RESULT_KIND_SCHEMA_CHANGE, MIN_SUPPORTED_VERSION,
ProtocolHandler)
from cassandra.metadata import Metadata, protect_name, murmur3 from cassandra.metadata import Metadata, protect_name, murmur3
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy, from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance, ExponentialReconnectionPolicy, HostDistance,
@@ -802,16 +803,16 @@ class Cluster(object):
def _make_connection_kwargs(self, address, kwargs_dict): def _make_connection_kwargs(self, address, kwargs_dict):
if self._auth_provider_callable: if self._auth_provider_callable:
kwargs_dict['authenticator'] = self._auth_provider_callable(address) kwargs_dict.setdefault('authenticator', self._auth_provider_callable(address))
kwargs_dict['port'] = self.port kwargs_dict.setdefault('port', self.port)
kwargs_dict['compression'] = self.compression kwargs_dict.setdefault('compression', self.compression)
kwargs_dict['sockopts'] = self.sockopts kwargs_dict.setdefault('sockopts', self.sockopts)
kwargs_dict['ssl_options'] = self.ssl_options kwargs_dict.setdefault('ssl_options', self.ssl_options)
kwargs_dict['cql_version'] = self.cql_version kwargs_dict.setdefault('cql_version', self.cql_version)
kwargs_dict['protocol_version'] = self.protocol_version kwargs_dict.setdefault('protocol_version', self.protocol_version)
kwargs_dict['user_type_map'] = self._user_types kwargs_dict.setdefault('user_type_map', self._user_types)
kwargs_dict['protocol_handler_class'] = self.protocol_handler_class kwargs_dict.setdefault('protocol_handler_class', self.protocol_handler_class)
return kwargs_dict return kwargs_dict
@@ -1371,7 +1372,7 @@ class Cluster(object):
log.debug("Preparing all known prepared statements against host %s", host) log.debug("Preparing all known prepared statements against host %s", host)
connection = None connection = None
try: try:
connection = self.connection_factory(host.address) connection = self.connection_factory(host.address, protocol_handler_class=ProtocolHandler)
try: try:
self.control_connection.wait_for_schema_agreement(connection) self.control_connection.wait_for_schema_agreement(connection)
except Exception: except Exception:
@@ -2130,7 +2131,7 @@ class ControlConnection(object):
while True: while True:
try: try:
connection = self._cluster.connection_factory(host.address, is_control_connection=True) connection = self._cluster.connection_factory(host.address, is_control_connection=True, protocol_handler_class=ProtocolHandler)
break break
except ProtocolVersionUnsupported as e: except ProtocolVersionUnsupported as e:
self._cluster.protocol_downgrade(host.address, e.startup_version) self._cluster.protocol_downgrade(host.address, e.startup_version)