Merge branch 'master' into develop

Conflicts:
	test/test_unit.py
This commit is contained in:
mrtheb
2014-03-17 15:43:00 -04:00
10 changed files with 302 additions and 51 deletions

View File

@@ -29,7 +29,7 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
# To send messages synchronously
producer = SimpleProducer(kafka)
@@ -80,7 +80,7 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
# HashedPartitioner is default
producer = KeyedProducer(kafka)
@@ -95,7 +95,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
# This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
@@ -115,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)

View File

@@ -14,7 +14,7 @@ def consume_example(client):
print(message)
def main():
client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")
produce_example(client)
consume_example(client)

View File

@@ -18,7 +18,7 @@ There are a few levels of abstraction:
# Possible API
client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")
producer = KafkaProducer(client, "topic")
producer.send_string("hello")

View File

@@ -1,5 +1,5 @@
__title__ = 'kafka'
__version__ = '0.2-alpha'
__version__ = '0.9.0'
__author__ = 'David Arthur'
__license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0'

View File

@@ -11,7 +11,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition,
LeaderUnavailableError,
KafkaUnavailableError)
from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -25,14 +25,15 @@ class KafkaClient(object):
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout.
def __init__(self, host, port, client_id=CLIENT_ID,
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
self.client_id = client_id
self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
(host, port): KafkaConnection(host, port, timeout=timeout)
}
self.hosts = collect_hosts(hosts)
# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
@@ -42,6 +43,15 @@ class KafkaClient(object):
# Private API #
##################
def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"
host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port)
return self.conns[host_key]
def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
@@ -50,7 +60,7 @@ class KafkaClient(object):
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self.conns[(broker.host, broker.port)]
return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
"""
@@ -83,14 +93,15 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for conn in self.conns.values():
for (host, port) in self.hosts:
try:
conn = self._get_conn(host, port)
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
log.warning("Could not send request [%r] to server %s, "
"trying next server: %s" % (request, conn, e))
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
continue
raise KafkaUnavailableError("All servers failed to process request")

View File

@@ -1,5 +1,9 @@
from cStringIO import StringIO
import gzip
import struct
_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
try:
import snappy
@@ -36,13 +40,101 @@ def gzip_decode(payload):
return result
def snappy_encode(payload):
def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
"""Encodes the given data with snappy if xerial_compatible is set then the
stream is encoded in a fashion compatible with the xerial snappy library
The block size (xerial_blocksize) controls how frequent the blocking occurs
32k is the default in the xerial library.
The format winds up being
+-------------+------------+--------------+------------+--------------+
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
|-------------+------------+--------------+------------+--------------|
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
+-------------+------------+--------------+------------+--------------+
It is important to not that the blocksize is the amount of uncompressed
data presented to snappy at each block, whereas the blocklen is the
number of bytes that will be present in the stream, that is the
length will always be <= blocksize.
"""
if not _has_snappy:
raise NotImplementedError("Snappy codec is not available")
return snappy.compress(payload)
if xerial_compatible:
def _chunker():
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]
out = StringIO()
header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
out.write(header)
for chunk in _chunker():
block = snappy.compress(chunk)
block_size = len(block)
out.write(struct.pack('!i', block_size))
out.write(block)
out.seek(0)
return out.read()
else:
return snappy.compress(payload)
def _detect_xerial_stream(payload):
"""Detects if the data given might have been encoded with the blocking mode
of the xerial snappy library.
This mode writes a magic header of the format:
+--------+--------------+------------+---------+--------+
| Marker | Magic String | Null / Pad | Version | Compat |
|--------+--------------+------------+---------+--------|
| byte | c-string | byte | int32 | int32 |
|--------+--------------+------------+---------+--------|
| -126 | 'SNAPPY' | \0 | | |
+--------+--------------+------------+---------+--------+
The pad appears to be to ensure that SNAPPY is a valid cstring
The version is the version of this format as written by xerial,
in the wild this is currently 1 as such we only support v1.
Compat is there to claim the miniumum supported version that
can read a xerial block stream, presently in the wild this is
1.
"""
if len(payload) > 16:
header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
return header == _XERIAL_V1_HEADER
return False
def snappy_decode(payload):
if not _has_snappy:
raise NotImplementedError("Snappy codec is not available")
return snappy.decompress(payload)
if _detect_xerial_stream(payload):
# TODO ? Should become a fileobj ?
out = StringIO()
byt = buffer(payload[16:])
length = len(byt)
cursor = 0
while cursor < length:
block_size = struct.unpack_from('!i', byt[cursor:])[0]
# Skip the block size
cursor += 4
end = cursor + block_size
out.write(snappy.decompress(byt[cursor:end]))
cursor = end
out.seek(0)
return out.read()
else:
return snappy.decompress(payload)

View File

@@ -2,6 +2,7 @@ import copy
import logging
import socket
import struct
from random import shuffle
from threading import local
from kafka.common import ConnectionError
@@ -9,6 +10,31 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka")
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionally
randomize the returned list.
"""
if isinstance(hosts, str):
hosts = hosts.strip().split(',')
result = []
for host_port in hosts:
res = host_port.split(':')
host = res[0]
port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
result.append((host.strip(), port))
if randomize:
shuffle(result)
return result
class KafkaConnection(local):
"""
@@ -81,7 +107,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
except socket.error, e:
except socket.error:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()

View File

@@ -4,6 +4,7 @@ from setuptools import setup, Command
class Tox(Command):
user_options = []
def initialize_options(self):

View File

@@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name):
class KafkaTestCase(unittest.TestCase):
def setUp(self):
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
ensure_topic_creation(self.client, self.topic)
@@ -42,7 +42,7 @@ class TestKafkaClient(KafkaTestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server.host, cls.server.port)
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
@classmethod
def tearDownClass(cls): # noqa
@@ -578,7 +578,7 @@ class TestConsumer(KafkaTestCase):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server2.host, cls.server2.port)
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port))
@classmethod
def tearDownClass(cls): # noqa
@@ -826,23 +826,26 @@ class TestConsumer(KafkaTestCase):
class TestFailover(KafkaTestCase):
def setUp(self):
@classmethod
def setUpClass(cls): # noqa
zk_chroot = random_string(10)
replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
self.zk = ZookeeperFixture.instance()
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
super(TestFailover, self).setUp()
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
def tearDown(self):
self.client.close()
for broker in self.brokers:
hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
cls.client = KafkaClient(hosts)
@classmethod
def tearDownClass(cls):
cls.client.close()
for broker in cls.brokers:
broker.close()
self.zk.close()
cls.zk.close()
def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
@@ -918,7 +921,8 @@ class TestFailover(KafkaTestCase):
return broker
def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer:

View File

@@ -3,11 +3,15 @@ import random
import struct
import unittest
from mock import MagicMock, patch
from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
TopicAndPartition, LeaderUnavailableError, PartitionUnavailableError
TopicAndPartition, KafkaUnavailableError,
LeaderUnavailableError, PartitionUnavailableError
)
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
@@ -17,10 +21,6 @@ from kafka.protocol import (
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
)
from kafka.client import KafkaClient
from mock import patch
ITERATIONS = 1000
STRLEN = 100
@@ -55,6 +55,7 @@ class TestPackage(unittest.TestCase):
from kafka import KafkaClient as KafkaClient2
self.assertEquals(KafkaClient2.__name__, "KafkaClient")
from kafka.codec import snappy_encode
self.assertEquals(snappy_encode.__name__, "snappy_encode")
@@ -74,6 +75,49 @@ class TestCodec(unittest.TestCase):
s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_detect_xerial(self):
import kafka as kafka1
_detect_xerial_stream = kafka1.codec._detect_xerial_stream
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
short_data = b'\x01\x02\x03\x04'
self.assertTrue(_detect_xerial_stream(header))
self.assertFalse(_detect_xerial_stream(b''))
self.assertFalse(_detect_xerial_stream(b'\x00'))
self.assertFalse(_detect_xerial_stream(false_header))
self.assertFalse(_detect_xerial_stream(random_snappy))
self.assertFalse(_detect_xerial_stream(short_data))
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_decode_xerial(self):
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
block_len = len(random_snappy)
random_snappy2 = snappy_encode('XERIAL' * 50)
block_len2 = len(random_snappy2)
to_test = header \
+ struct.pack('!i', block_len) + random_snappy \
+ struct.pack('!i', block_len2) + random_snappy2 \
self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_encode_xerial(self):
to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
'\x00\x00\x00\x18' + \
'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
'\x00\x00\x00\x18' + \
'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
self.assertEquals(compressed, to_ensure)
class TestProtocol(unittest.TestCase):
@@ -91,9 +135,8 @@ class TestProtocol(unittest.TestCase):
payloads = ["v1", "v2"]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(
msg.attributes,
KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP)
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
KafkaProtocol.CODEC_GZIP)
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
@@ -108,9 +151,8 @@ class TestProtocol(unittest.TestCase):
payloads = ["v1", "v2"]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(
msg.attributes,
KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY)
self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
KafkaProtocol.CODEC_SNAPPY)
self.assertEqual(msg.key, None)
expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff"
"\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff"
@@ -281,7 +323,6 @@ class TestProtocol(unittest.TestCase):
len(ms3), ms3)
responses = list(KafkaProtocol.decode_fetch_response(encoded))
def expand_messages(response):
return FetchResponse(response.topic, response.partition,
response.error, response.highwaterMark,
@@ -369,6 +410,7 @@ class TestProtocol(unittest.TestCase):
def test_decode_offset_response(self):
pass
@unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self):
pass
@@ -388,6 +430,81 @@ class TestProtocol(unittest.TestCase):
class TestKafkaClient(unittest.TestCase):
def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(
hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
def test_init_with_csv(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(
hosts='kafka01:9092,kafka02:9092,kafka03:9092')
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'
mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock()
}
# inject KafkaConnection side effects
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
def mock_get_conn(host, port):
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
self.assertRaises(
KafkaUnavailableError,
client._send_broker_unaware_request,
1, 'fake request')
for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')
def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available'
mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock(),
('kafka03', 9092): MagicMock()
}
# inject KafkaConnection side effects
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
def mock_get_conn(host, port):
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
resp = client._send_broker_unaware_request(1, 'fake request')
self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_load_metadata(self, protocol, conn):
@@ -416,7 +533,7 @@ class TestKafkaClient(unittest.TestCase):
protocol.decode_metadata_response.return_value = (brokers, topics)
# client loads metadata at init
client = KafkaClient(host='broker_1', port=4567)
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({
TopicAndPartition('topic_1', 0): brokers[1],
TopicAndPartition('topic_noleader', 0): None,
@@ -440,7 +557,7 @@ class TestKafkaClient(unittest.TestCase):
topics = {'topic_no_partitions': {}}
protocol.decode_metadata_response.return_value = (brokers, topics)
client = KafkaClient(host='broker_1', port=4567)
client = KafkaClient(hosts=['broker_1:4567'])
# topic metadata is loaded but empty
self.assertDictEqual({}, client.topics_to_brokers)
@@ -473,7 +590,7 @@ class TestKafkaClient(unittest.TestCase):
topics = {'topic_no_partitions': {}}
protocol.decode_metadata_response.return_value = (brokers, topics)
client = KafkaClient(host='broker_1', port=4567)
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({}, client.topics_to_brokers)
self.assertRaises(
@@ -499,7 +616,7 @@ class TestKafkaClient(unittest.TestCase):
}
protocol.decode_metadata_response.return_value = (brokers, topics)
client = KafkaClient(host='broker_1', port=4567)
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual(
{
TopicAndPartition('topic_noleader', 0): None,
@@ -535,7 +652,7 @@ class TestKafkaClient(unittest.TestCase):
}
protocol.decode_metadata_response.return_value = (brokers, topics)
client = KafkaClient(host='broker_1', port=4567)
client = KafkaClient(hosts=['broker_1:4567'])
requests = [ProduceRequest(
"topic_noleader", 0,