Prefer lz4 compression, allow specifying compression type
This is related to #1
This commit is contained in:
@@ -2,6 +2,11 @@
|
||||
=====
|
||||
In Progress
|
||||
|
||||
Features
|
||||
--------
|
||||
* Allow a specific compression type to be requested for communications with
|
||||
Cassandra and prefer lz4 if available
|
||||
|
||||
Bug Fixes
|
||||
---------
|
||||
* Update token metadata (for TokenAware calculations) when a node is removed
|
||||
|
||||
@@ -161,8 +161,15 @@ class Cluster(object):
|
||||
|
||||
compression = True
|
||||
"""
|
||||
Whether or not compression should be enabled when possible. Defaults to
|
||||
:const:`True` and attempts to use snappy compression.
|
||||
Controls compression for communications between the driver and Cassandra.
|
||||
If left as the default of :const:`True`, either lz4 or snappy compression
|
||||
may be used, depending on what is supported by both the driver
|
||||
and Cassandra. If both are fully supported, lz4 will be preferred.
|
||||
|
||||
You may also set this to 'snappy' or 'lz4' to request that specific
|
||||
compression type.
|
||||
|
||||
Setting this to :const:`False` disables compression.
|
||||
"""
|
||||
|
||||
auth_provider = None
|
||||
|
||||
@@ -31,23 +31,12 @@ from cassandra.decoder import (ReadyMessage, AuthenticateMessage, OptionsMessage
|
||||
StartupMessage, ErrorMessage, CredentialsMessage,
|
||||
QueryMessage, ResultMessage, decode_response,
|
||||
InvalidRequestException, SupportedMessage)
|
||||
from cassandra.util import OrderedDict
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
locally_supported_compressions = {}
|
||||
|
||||
try:
|
||||
import snappy
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
# work around apparently buggy snappy decompress
|
||||
def decompress(byts):
|
||||
if byts == '\x00':
|
||||
return ''
|
||||
return snappy.decompress(byts)
|
||||
locally_supported_compressions['snappy'] = (snappy.compress, decompress)
|
||||
locally_supported_compressions = OrderedDict()
|
||||
|
||||
try:
|
||||
import lz4
|
||||
@@ -69,6 +58,18 @@ else:
|
||||
|
||||
locally_supported_compressions['lz4'] = (lz4_compress, lz4_decompress)
|
||||
|
||||
try:
|
||||
import snappy
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
# work around apparently buggy snappy decompress
|
||||
def decompress(byts):
|
||||
if byts == '\x00':
|
||||
return ''
|
||||
return snappy.decompress(byts)
|
||||
locally_supported_compressions['snappy'] = (snappy.compress, decompress)
|
||||
|
||||
|
||||
MAX_STREAM_PER_CONNECTION = 127
|
||||
|
||||
@@ -369,7 +370,22 @@ class Connection(object):
|
||||
locally_supported_compressions.keys(),
|
||||
remote_supported_compressions)
|
||||
else:
|
||||
compression_type = iter(overlap).next() # choose any
|
||||
compression_type = None
|
||||
if isinstance(self.compression, basestring):
|
||||
# the user picked a specific compression type ('snappy' or 'lz4')
|
||||
if self.compression not in remote_supported_compressions:
|
||||
raise ProtocolError(
|
||||
"The requested compression type (%s) is not supported by the Cassandra server at %s"
|
||||
% (self.compression, self.host))
|
||||
compression_type = self.compression
|
||||
else:
|
||||
# our locally supported compressions are ordered to prefer
|
||||
# lz4, if available
|
||||
for k in locally_supported_compressions.keys():
|
||||
if k in overlap:
|
||||
compression_type = k
|
||||
break
|
||||
|
||||
# set the decompressor here, but set the compressor only after
|
||||
# a successful Ready message
|
||||
self._compressor, self.decompressor = \
|
||||
|
||||
@@ -24,7 +24,7 @@ from mock import Mock, ANY
|
||||
|
||||
from cassandra.connection import (Connection, PROTOCOL_VERSION,
|
||||
HEADER_DIRECTION_TO_CLIENT,
|
||||
HEADER_DIRECTION_FROM_CLIENT, ProtocolError)
|
||||
HEADER_DIRECTION_FROM_CLIENT, ProtocolError, locally_supported_compressions)
|
||||
from cassandra.decoder import (write_stringmultimap, write_int, write_string,
|
||||
SupportedMessage)
|
||||
from cassandra.marshal import uint8_pack, uint32_pack
|
||||
@@ -144,6 +144,123 @@ class ConnectionTest(unittest.TestCase):
|
||||
args, kwargs = c.defunct.call_args
|
||||
self.assertIsInstance(args[0], ProtocolError)
|
||||
|
||||
def test_prefer_lz4_compression(self, *args):
|
||||
c = self.make_connection()
|
||||
c._id_queue.get_nowait()
|
||||
c._callbacks = {0: c._handle_options_response}
|
||||
c.defunct = Mock()
|
||||
c.cql_version = "3.0.3"
|
||||
|
||||
locally_supported_compressions.pop('lz4', None)
|
||||
locally_supported_compressions.pop('snappy', None)
|
||||
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
|
||||
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
|
||||
|
||||
# read in a SupportedMessage response
|
||||
header = self.make_header_prefix(SupportedMessage)
|
||||
|
||||
options_buf = StringIO()
|
||||
write_stringmultimap(options_buf, {
|
||||
'CQL_VERSION': ['3.0.3'],
|
||||
'COMPRESSION': ['snappy', 'lz4']
|
||||
})
|
||||
options = options_buf.getvalue()
|
||||
|
||||
message = self.make_msg(header, options)
|
||||
c.process_msg(message, len(message) - 8)
|
||||
|
||||
self.assertEqual(c.decompressor, locally_supported_compressions['lz4'][1])
|
||||
|
||||
def test_requested_compression_not_available(self, *args):
|
||||
c = self.make_connection()
|
||||
c._id_queue.get_nowait()
|
||||
c._callbacks = {0: c._handle_options_response}
|
||||
c.defunct = Mock()
|
||||
# request lz4 compression
|
||||
c.compression = "lz4"
|
||||
|
||||
locally_supported_compressions.pop('lz4', None)
|
||||
locally_supported_compressions.pop('snappy', None)
|
||||
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
|
||||
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
|
||||
|
||||
# read in a SupportedMessage response
|
||||
header = self.make_header_prefix(SupportedMessage)
|
||||
|
||||
# the server only supports snappy
|
||||
options_buf = StringIO()
|
||||
write_stringmultimap(options_buf, {
|
||||
'CQL_VERSION': ['3.0.3'],
|
||||
'COMPRESSION': ['snappy']
|
||||
})
|
||||
options = options_buf.getvalue()
|
||||
|
||||
message = self.make_msg(header, options)
|
||||
c.process_msg(message, len(message) - 8)
|
||||
|
||||
# make sure it errored correctly
|
||||
c.defunct.assert_called_once_with(ANY)
|
||||
args, kwargs = c.defunct.call_args
|
||||
self.assertIsInstance(args[0], ProtocolError)
|
||||
|
||||
def test_use_requested_compression(self, *args):
|
||||
c = self.make_connection()
|
||||
c._id_queue.get_nowait()
|
||||
c._callbacks = {0: c._handle_options_response}
|
||||
c.defunct = Mock()
|
||||
# request snappy compression
|
||||
c.compression = "snappy"
|
||||
|
||||
locally_supported_compressions.pop('lz4', None)
|
||||
locally_supported_compressions.pop('snappy', None)
|
||||
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
|
||||
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
|
||||
|
||||
# read in a SupportedMessage response
|
||||
header = self.make_header_prefix(SupportedMessage)
|
||||
|
||||
# the server only supports snappy
|
||||
options_buf = StringIO()
|
||||
write_stringmultimap(options_buf, {
|
||||
'CQL_VERSION': ['3.0.3'],
|
||||
'COMPRESSION': ['snappy', 'lz4']
|
||||
})
|
||||
options = options_buf.getvalue()
|
||||
|
||||
message = self.make_msg(header, options)
|
||||
c.process_msg(message, len(message) - 8)
|
||||
|
||||
self.assertEqual(c.decompressor, locally_supported_compressions['snappy'][1])
|
||||
|
||||
def test_disable_compression(self, *args):
|
||||
c = self.make_connection()
|
||||
c._id_queue.get_nowait()
|
||||
c._callbacks = {0: c._handle_options_response}
|
||||
c.defunct = Mock()
|
||||
# disable compression
|
||||
c.compression = False
|
||||
|
||||
locally_supported_compressions.pop('lz4', None)
|
||||
locally_supported_compressions.pop('snappy', None)
|
||||
locally_supported_compressions['lz4'] = ('lz4compress', 'lz4decompress')
|
||||
locally_supported_compressions['snappy'] = ('snappycompress', 'snappydecompress')
|
||||
|
||||
# read in a SupportedMessage response
|
||||
header = self.make_header_prefix(SupportedMessage)
|
||||
|
||||
# the server only supports snappy
|
||||
options_buf = StringIO()
|
||||
write_stringmultimap(options_buf, {
|
||||
'CQL_VERSION': ['3.0.3'],
|
||||
'COMPRESSION': ['snappy', 'lz4']
|
||||
})
|
||||
options = options_buf.getvalue()
|
||||
|
||||
message = self.make_msg(header, options)
|
||||
c.process_msg(message, len(message) - 8)
|
||||
|
||||
self.assertEqual(c.decompressor, None)
|
||||
|
||||
def test_not_implemented(self):
|
||||
"""
|
||||
Ensure the following methods throw NIE's. If not, come back and test them.
|
||||
|
||||
Reference in New Issue
Block a user