Take the linter to kafka/codec.py
This commit is contained in:
@@ -1,8 +1,7 @@
|
||||
from io import BytesIO
|
||||
import gzip
|
||||
from io import BytesIO
|
||||
import struct
|
||||
|
||||
import six
|
||||
from six.moves import xrange
|
||||
|
||||
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
|
||||
@@ -10,9 +9,9 @@ _XERIAL_V1_FORMAT = 'bccccccBii'
|
||||
|
||||
try:
|
||||
import snappy
|
||||
_has_snappy = True
|
||||
_HAS_SNAPPY = True
|
||||
except ImportError:
|
||||
_has_snappy = False
|
||||
_HAS_SNAPPY = False
|
||||
|
||||
|
||||
def has_gzip():
|
||||
@@ -20,7 +19,7 @@ def has_gzip():
|
||||
|
||||
|
||||
def has_snappy():
|
||||
return _has_snappy
|
||||
return _HAS_SNAPPY
|
||||
|
||||
|
||||
def gzip_encode(payload):
|
||||
@@ -57,8 +56,8 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
|
||||
"""Encodes the given data with snappy if xerial_compatible is set then the
|
||||
stream is encoded in a fashion compatible with the xerial snappy library
|
||||
|
||||
The block size (xerial_blocksize) controls how frequent the blocking occurs
|
||||
32k is the default in the xerial library.
|
||||
The block size (xerial_blocksize) controls how frequent the blocking
|
||||
occurs 32k is the default in the xerial library.
|
||||
|
||||
The format winds up being
|
||||
+-------------+------------+--------------+------------+--------------+
|
||||
@@ -73,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
|
||||
length will always be <= blocksize.
|
||||
"""
|
||||
|
||||
if not _has_snappy:
|
||||
if not has_snappy():
|
||||
raise NotImplementedError("Snappy codec is not available")
|
||||
|
||||
if xerial_compatible:
|
||||
@@ -84,7 +83,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
|
||||
out = BytesIO()
|
||||
|
||||
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
|
||||
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
|
||||
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
|
||||
|
||||
out.write(header)
|
||||
for chunk in _chunker():
|
||||
@@ -123,13 +122,13 @@ def _detect_xerial_stream(payload):
|
||||
"""
|
||||
|
||||
if len(payload) > 16:
|
||||
header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
|
||||
header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
|
||||
return header == _XERIAL_V1_HEADER
|
||||
return False
|
||||
|
||||
|
||||
def snappy_decode(payload):
|
||||
if not _has_snappy:
|
||||
if not has_snappy():
|
||||
raise NotImplementedError("Snappy codec is not available")
|
||||
|
||||
if _detect_xerial_stream(payload):
|
||||
|
||||
Reference in New Issue
Block a user