ProtocolVersion 'enum'

This commit is contained in:
Adam Holmberg
2016-12-21 11:42:30 -06:00
parent 2e28fc27c9
commit f06267a09a
8 changed files with 81 additions and 62 deletions

View File

@@ -125,6 +125,56 @@ def consistency_value_to_name(value):
return ConsistencyLevel.value_to_name[value] if value is not None else "Not Set" return ConsistencyLevel.value_to_name[value] if value is not None else "Not Set"
class ProtocolVersion(object):
"""
Defines native protocol versions supported by this driver.
"""
V1 = 1
"""
v1, supported in Cassandra 1.2-->2.2
"""
V2 = 2
"""
v2, supported in Cassandra 2.0-->2.2;
added support for lightweight transactions, batch operations, and automatic query paging.
"""
V3 = 3
"""
v3, supported in Cassandra 2.1-->3.x+;
added support for protocol-level client-side timestamps (see :attr:`.Session.use_client_timestamp`),
serial consistency levels for :class:`~.BatchStatement`, and an improved connection pool.
"""
V4 = 4
"""
v4, supported in Cassandra 2.2-->3.x+;
added a number of new types, server warnings, new failure messages, and custom payloads. Details in the
`project docs <https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec>`_
"""
V5 = 5
"""
v5, in beta from 3.x+
"""
SUPPORTED_VERSIONS = (V5, V4, V3, V2, V1)
"""
A tuple of all supported protocol versions
"""
MIN_SUPPORTED = min(SUPPORTED_VERSIONS)
"""
Minimum protocol version supported by this driver.
"""
MAX_SUPPORTED = max(SUPPORTED_VERSIONS)
"""
Maximum protocol versioni supported by this driver.
"""
class SchemaChangeType(object): class SchemaChangeType(object):
DROPPED = 'DROPPED' DROPPED = 'DROPPED'
CREATED = 'CREATED' CREATED = 'CREATED'

View File

@@ -42,7 +42,7 @@ except ImportError:
from cassandra import (ConsistencyLevel, AuthenticationFailed, from cassandra import (ConsistencyLevel, AuthenticationFailed,
OperationTimedOut, UnsupportedOperation, OperationTimedOut, UnsupportedOperation,
SchemaTargetType, DriverException) SchemaTargetType, DriverException, ProtocolVersion)
from cassandra.connection import (ConnectionException, ConnectionShutdown, from cassandra.connection import (ConnectionException, ConnectionShutdown,
ConnectionHeartbeat, ProtocolVersionUnsupported) ConnectionHeartbeat, ProtocolVersionUnsupported)
from cassandra.cqltypes import UserType from cassandra.cqltypes import UserType
@@ -57,8 +57,7 @@ from cassandra.protocol import (QueryMessage, ResultMessage,
IsBootstrappingErrorMessage, IsBootstrappingErrorMessage,
BatchMessage, RESULT_KIND_PREPARED, BatchMessage, RESULT_KIND_PREPARED,
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS, RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
RESULT_KIND_SCHEMA_CHANGE, MIN_SUPPORTED_VERSION, RESULT_KIND_SCHEMA_CHANGE, ProtocolHandler)
ProtocolHandler)
from cassandra.metadata import Metadata, protect_name, murmur3 from cassandra.metadata import Metadata, protect_name, murmur3
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy, from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance, ExponentialReconnectionPolicy, HostDistance,
@@ -355,45 +354,18 @@ class Cluster(object):
server will be automatically used. server will be automatically used.
""" """
protocol_version = 4 protocol_version = ProtocolVersion.V4
""" """
The maximum version of the native protocol to use. The maximum version of the native protocol to use.
See :class:`.ProtocolVersion` for more information about versions.
If not set in the constructor, the driver will automatically downgrade If not set in the constructor, the driver will automatically downgrade
version based on a negotiation with the server, but it is most efficient version based on a negotiation with the server, but it is most efficient
to set this to the maximum supported by your version of Cassandra. to set this to the maximum supported by your version of Cassandra.
Setting this will also prevent conflicting versions negotiated if your Setting this will also prevent conflicting versions negotiated if your
cluster is upgraded. cluster is upgraded.
Version 2 of the native protocol adds support for lightweight transactions,
batch operations, and automatic query paging. The v2 protocol is
supported by Cassandra 2.0+.
Version 3 of the native protocol adds support for protocol-level
client-side timestamps (see :attr:`.Session.use_client_timestamp`),
serial consistency levels for :class:`~.BatchStatement`, and an
improved connection pool.
Version 4 of the native protocol adds a number of new types, server warnings,
new failure messages, and custom payloads. Details in the
`project docs <https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec>`_
The following table describes the native protocol versions that
are supported by each version of Cassandra:
+-------------------+-------------------+
| Cassandra Version | Protocol Versions |
+===================+===================+
| 1.2 | 1 |
+-------------------+-------------------+
| 2.0 | 1, 2 |
+-------------------+-------------------+
| 2.1 | 1, 2, 3 |
+-------------------+-------------------+
| 2.2 | 1, 2, 3, 4 |
+-------------------+-------------------+
| 3.x | 3, 4 |
+-------------------+-------------------+
""" """
allow_beta_protocol_version = False allow_beta_protocol_version = False
@@ -1141,15 +1113,14 @@ class Cluster(object):
if self._protocol_version_explicit: if self._protocol_version_explicit:
raise DriverException("ProtocolError returned from server while using explicitly set client protocol_version %d" % (previous_version,)) raise DriverException("ProtocolError returned from server while using explicitly set client protocol_version %d" % (previous_version,))
new_version = previous_version - 1 try:
if new_version < self.protocol_version: new_version = next(v for v in sorted(ProtocolVersion.SUPPORTED_VERSIONS, reversed=True) if v < previous_version)
if new_version >= MIN_SUPPORTED_VERSION:
log.warning("Downgrading core protocol version from %d to %d for %s. " log.warning("Downgrading core protocol version from %d to %d for %s. "
"To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. " "To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. "
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_addr) "http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_addr)
self.protocol_version = new_version self.protocol_version = new_version
else: except StopIteration:
raise DriverException("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION)) raise DriverException("Cannot downgrade protocol version below minimum supported version: %d" % (ProtocolVersion.MIN_SUPPORTED,))
def connect(self, keyspace=None, wait_for_all_pools=False): def connect(self, keyspace=None, wait_for_all_pools=False):
""" """

View File

@@ -37,7 +37,7 @@ if 'gevent.monkey' in sys.modules:
else: else:
from six.moves.queue import Queue, Empty # noqa from six.moves.queue import Queue, Empty # noqa
from cassandra import ConsistencyLevel, AuthenticationFailed, OperationTimedOut from cassandra import ConsistencyLevel, AuthenticationFailed, OperationTimedOut, ProtocolVersion
from cassandra.marshal import int32_pack from cassandra.marshal import int32_pack
from cassandra.protocol import (ReadyMessage, AuthenticateMessage, OptionsMessage, from cassandra.protocol import (ReadyMessage, AuthenticateMessage, OptionsMessage,
StartupMessage, ErrorMessage, CredentialsMessage, StartupMessage, ErrorMessage, CredentialsMessage,
@@ -45,7 +45,7 @@ from cassandra.protocol import (ReadyMessage, AuthenticateMessage, OptionsMessag
InvalidRequestException, SupportedMessage, InvalidRequestException, SupportedMessage,
AuthResponseMessage, AuthChallengeMessage, AuthResponseMessage, AuthChallengeMessage,
AuthSuccessMessage, ProtocolException, AuthSuccessMessage, ProtocolException,
MAX_SUPPORTED_VERSION, RegisterMessage) RegisterMessage)
from cassandra.util import OrderedDict from cassandra.util import OrderedDict
@@ -197,7 +197,7 @@ class Connection(object):
out_buffer_size = 4096 out_buffer_size = 4096
cql_version = None cql_version = None
protocol_version = MAX_SUPPORTED_VERSION protocol_version = ProtocolVersion.MAX_SUPPORTED
keyspace = None keyspace = None
compression = True compression = True
@@ -252,7 +252,7 @@ class Connection(object):
def __init__(self, host='127.0.0.1', port=9042, authenticator=None, def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
ssl_options=None, sockopts=None, compression=True, ssl_options=None, sockopts=None, compression=True,
cql_version=None, protocol_version=MAX_SUPPORTED_VERSION, is_control_connection=False, cql_version=None, protocol_version=ProtocolVersion.MAX_SUPPORTED, is_control_connection=False,
user_type_map=None, connect_timeout=None, allow_beta_protocol_version=False): user_type_map=None, connect_timeout=None, allow_beta_protocol_version=False):
self.host = host self.host = host
self.port = port self.port = port
@@ -541,7 +541,7 @@ class Connection(object):
pos = len(buf) pos = len(buf)
if pos: if pos:
version = int_from_buf_item(buf[0]) & PROTOCOL_VERSION_MASK version = int_from_buf_item(buf[0]) & PROTOCOL_VERSION_MASK
if version > MAX_SUPPORTED_VERSION: if version > ProtocolVersion.MAX_SUPPORTED:
raise ProtocolError("This version of the driver does not support protocol version %d" % version) raise ProtocolError("This version of the driver does not support protocol version %d" % version)
frame_header = frame_header_v3 if version >= 3 else frame_header_v1_v2 frame_header = frame_header_v3 if version >= 3 else frame_header_v1_v2
# this frame header struct is everything after the version byte # this frame header struct is everything after the version byte

View File

@@ -55,9 +55,6 @@ class InternalError(Exception):
ColumnMetadata = namedtuple("ColumnMetadata", ['keyspace_name', 'table_name', 'name', 'type']) ColumnMetadata = namedtuple("ColumnMetadata", ['keyspace_name', 'table_name', 'name', 'type'])
MIN_SUPPORTED_VERSION = 1
MAX_SUPPORTED_VERSION = 5
HEADER_DIRECTION_TO_CLIENT = 0x80 HEADER_DIRECTION_TO_CLIENT = 0x80
HEADER_DIRECTION_MASK = 0x80 HEADER_DIRECTION_MASK = 0x80

View File

@@ -14,6 +14,9 @@
.. autoclass:: ConsistencyLevel .. autoclass:: ConsistencyLevel
:members: :members:
.. autoclass:: ProtocolVersion
:members:
.. autoclass:: UserFunctionDescriptor .. autoclass:: UserFunctionDescriptor
:members: :members:
:inherited-members: :inherited-members:

View File

@@ -14,9 +14,8 @@
import sys,logging, traceback, time import sys,logging, traceback, time
from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,\ from cassandra import (ConsistencyLevel, OperationTimedOut, ReadTimeout, WriteTimeout, ReadFailure, WriteFailure,
FunctionFailure FunctionFailure, ProtocolVersion)
from cassandra.protocol import MAX_SUPPORTED_VERSION
from cassandra.cluster import Cluster, NoHostAvailable from cassandra.cluster import Cluster, NoHostAvailable
from cassandra.concurrent import execute_concurrent_with_args from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import SimpleStatement from cassandra.query import SimpleStatement
@@ -70,7 +69,7 @@ class ClientExceptionTests(unittest.TestCase):
"Native protocol 4,0+ is required for custom payloads, currently using %r" "Native protocol 4,0+ is required for custom payloads, currently using %r"
% (PROTOCOL_VERSION,)) % (PROTOCOL_VERSION,))
try: try:
self.cluster = Cluster(protocol_version=MAX_SUPPORTED_VERSION, allow_beta_protocol_version=True) self.cluster = Cluster(protocol_version=ProtocolVersion.MAX_SUPPORTED, allow_beta_protocol_version=True)
self.session = self.cluster.connect() self.session = self.cluster.connect()
except NoHostAvailable: except NoHostAvailable:
log.info("Protocol Version 5 not supported,") log.info("Protocol Version 5 not supported,")

View File

@@ -30,7 +30,6 @@ from cassandra.concurrent import execute_concurrent
from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy, from cassandra.policies import (RoundRobinPolicy, ExponentialReconnectionPolicy,
RetryPolicy, SimpleConvictionPolicy, HostDistance, RetryPolicy, SimpleConvictionPolicy, HostDistance,
WhiteListRoundRobinPolicy, AddressTranslator) WhiteListRoundRobinPolicy, AddressTranslator)
from cassandra.protocol import MAX_SUPPORTED_VERSION
from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory from cassandra.query import SimpleStatement, TraceUnavailable, tuple_factory
from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node,\ from tests.integration import use_singledc, PROTOCOL_VERSION, get_server_versions, CASSANDRA_VERSION, execute_until_pass, execute_with_long_wait_retry, get_node,\
@@ -188,7 +187,7 @@ class ClusterTests(unittest.TestCase):
""" """
cluster = Cluster() cluster = Cluster()
self.assertLessEqual(cluster.protocol_version, MAX_SUPPORTED_VERSION) self.assertLessEqual(cluster.protocol_version, cassandra.ProtocolVersion.MAX_SUPPORTED)
session = cluster.connect() session = cluster.connect()
updated_protocol_version = session._protocol_version updated_protocol_version = session._protocol_version
updated_cluster_version = cluster.protocol_version updated_cluster_version = cluster.protocol_version
@@ -1107,7 +1106,7 @@ class BetaProtocolTest(unittest.TestCase):
@test_category connection @test_category connection
""" """
cluster = Cluster(protocol_version=MAX_SUPPORTED_VERSION, allow_beta_protocol_version=False) cluster = Cluster(protocol_version=cassandra.ProtocolVersion.MAX_SUPPORTED, allow_beta_protocol_version=False)
try: try:
with self.assertRaises(NoHostAvailable): with self.assertRaises(NoHostAvailable):
cluster.connect() cluster.connect()
@@ -1126,9 +1125,9 @@ class BetaProtocolTest(unittest.TestCase):
@test_category connection @test_category connection
""" """
cluster = Cluster(protocol_version=MAX_SUPPORTED_VERSION, allow_beta_protocol_version=True) cluster = Cluster(protocol_version=cassandra.ProtocolVersion.MAX_SUPPORTED, allow_beta_protocol_version=True)
session = cluster.connect() session = cluster.connect()
self.assertEqual(cluster.protocol_version, MAX_SUPPORTED_VERSION) self.assertEqual(cluster.protocol_version, cassandra.ProtocolVersion.MAX_SUPPORTED)
self.assertTrue(session.execute("select release_version from system.local")[0]) self.assertTrue(session.execute("select release_version from system.local")[0])

View File

@@ -13,8 +13,8 @@
# limitations under the License. # limitations under the License.
import sys import sys
from cassandra import ProtocolVersion
from cassandra.marshal import bitlength from cassandra.marshal import bitlength
from cassandra.protocol import MAX_SUPPORTED_VERSION
try: try:
import unittest2 as unittest import unittest2 as unittest
@@ -153,10 +153,10 @@ class UnmarshalTest(unittest.TestCase):
# Just verifying expected exception here # Just verifying expected exception here
f = converted_types[-1] f = converted_types[-1]
self.assertIsInstance(f, float) self.assertIsInstance(f, float)
self.assertRaises(TypeError, DecimalType.to_binary, f, MAX_SUPPORTED_VERSION) self.assertRaises(TypeError, DecimalType.to_binary, f, ProtocolVersion.MAX_SUPPORTED)
converted_types = converted_types[:-1] converted_types = converted_types[:-1]
for proto_ver in range(1, MAX_SUPPORTED_VERSION + 1): for proto_ver in range(1, ProtocolVersion.MAX_SUPPORTED + 1):
for n in converted_types: for n in converted_types:
expected = Decimal(n) expected = Decimal(n)
self.assertEqual(DecimalType.from_binary(DecimalType.to_binary(n, proto_ver), proto_ver), expected) self.assertEqual(DecimalType.from_binary(DecimalType.to_binary(n, proto_ver), proto_ver), expected)