Write xerial-formatted snappy by default; use buffers to reduce copies

This commit is contained in:
Dana Powers
2016-01-25 15:36:08 -08:00
parent 0dcd5f10b9
commit 7e09258409

View File

@@ -9,9 +9,8 @@ _XERIAL_V1_FORMAT = 'bccccccBii'
try: try:
import snappy import snappy
_HAS_SNAPPY = True
except ImportError: except ImportError:
_HAS_SNAPPY = False snappy = None
try: try:
import lz4 import lz4
@@ -28,7 +27,7 @@ def has_gzip():
def has_snappy(): def has_snappy():
return _HAS_SNAPPY return snappy is not None
def has_lz4(): def has_lz4():
@@ -68,7 +67,7 @@ def gzip_decode(payload):
return result return result
def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024): def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
"""Encodes the given data with snappy compression. """Encodes the given data with snappy compression.
If xerial_compatible is set then the stream is encoded in a fashion If xerial_compatible is set then the stream is encoded in a fashion
@@ -97,28 +96,23 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024):
if not has_snappy(): if not has_snappy():
raise NotImplementedError("Snappy codec is not available") raise NotImplementedError("Snappy codec is not available")
if xerial_compatible: if not xerial_compatible:
def _chunker(): return snappy.compress(payload)
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]
out = BytesIO() out = BytesIO()
for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER):
out.write(struct.pack('!' + fmt, dat))
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat # Chunk through buffers to avoid creating intermediate slice copies
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) for chunk in (buffer(payload, i, xerial_blocksize)
for i in xrange(0, len(payload), xerial_blocksize)):
out.write(header)
for chunk in _chunker():
block = snappy.compress(chunk) block = snappy.compress(chunk)
block_size = len(block) block_size = len(block)
out.write(struct.pack('!i', block_size)) out.write(struct.pack('!i', block_size))
out.write(block) out.write(block)
out.seek(0) return out.getvalue()
return out.read()
else:
return snappy.compress(payload)
def _detect_xerial_stream(payload): def _detect_xerial_stream(payload):