Merge branch 'master' into conn_refactor

Conflicts:
	example.py
This commit is contained in:
Mark Roberts
2014-02-26 21:36:22 -08:00
8 changed files with 154 additions and 31 deletions

View File

@@ -29,7 +29,7 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost:9092")
# To send messages synchronously # To send messages synchronously
producer = SimpleProducer(kafka) producer = SimpleProducer(kafka)
@@ -80,7 +80,7 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost:9092")
# HashedPartitioner is default # HashedPartitioner is default
producer = KeyedProducer(kafka) producer = KeyedProducer(kafka)
@@ -95,7 +95,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
from kafka.client import KafkaClient from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer from kafka.consumer import MultiProcessConsumer
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost:9092")
# This will split the number of partitions among two processes # This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
@@ -115,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
```python ```python
from kafka.client import KafkaClient from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092) kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1, req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")]) messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)

View File

@@ -9,7 +9,7 @@ class Producer(threading.Thread):
daemon = True daemon = True
def run(self): def run(self):
client = KafkaClient("localhost", 9092) client = KafkaClient("localhost:9092")
producer = SimpleProducer(client) producer = SimpleProducer(client)
while True: while True:
@@ -23,7 +23,7 @@ class Consumer(threading.Thread):
daemon = True daemon = True
def run(self): def run(self):
client = KafkaClient("localhost", 9092) client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "my-topic") consumer = SimpleConsumer(client, "test-group", "my-topic")
for message in consumer: for message in consumer:

View File

@@ -18,7 +18,7 @@ There are a few levels of abstraction:
# Possible API # Possible API
client = KafkaClient("localhost", 9092) client = KafkaClient("localhost:9092")
producer = KafkaProducer(client, "topic") producer = KafkaProducer(client, "topic")
producer.send_string("hello") producer.send_string("hello")

View File

@@ -10,7 +10,7 @@ from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
BrokerResponseError, PartitionUnavailableError, BrokerResponseError, PartitionUnavailableError,
KafkaUnavailableError, KafkaRequestError) KafkaUnavailableError, KafkaRequestError)
from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
@@ -24,14 +24,15 @@ class KafkaClient(object):
# NOTE: The timeout given to the client should always be greater than the # NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a # one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout. # socket timeout.
def __init__(self, host, port, client_id=CLIENT_ID, def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap # We need one connection to bootstrap
self.client_id = client_id self.client_id = client_id
self.timeout = timeout self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection self.hosts = collect_hosts(hosts)
(host, port): KafkaConnection(host, port, timeout=timeout)
} # create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
@@ -41,6 +42,15 @@ class KafkaClient(object):
# Private API # # Private API #
################## ##################
def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"
host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port)
return self.conns[host_key]
def _get_conn_for_broker(self, broker): def _get_conn_for_broker(self, broker):
""" """
Get or create a connection to a broker Get or create a connection to a broker
@@ -49,7 +59,7 @@ class KafkaClient(object):
self.conns[(broker.host, broker.port)] = \ self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, timeout=self.timeout) KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self.conns[(broker.host, broker.port)] return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition): def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition) key = TopicAndPartition(topic, partition)
@@ -72,7 +82,8 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed. brokers. Keep trying until you succeed.
""" """
for conn in self.conns.values(): for (host, port) in self.hosts:
conn = self._get_conn(host, port)
try: try:
conn.send(requestId, request) conn.send(requestId, request)
response = conn.recv(requestId) response = conn.recv(requestId)

View File

@@ -2,6 +2,7 @@ import copy
import logging import logging
import socket import socket
import struct import struct
from random import shuffle
from threading import local from threading import local
from kafka.common import ConnectionError from kafka.common import ConnectionError
@@ -9,6 +10,31 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionnaly
randomize the returned list.
"""
if isinstance(hosts, str):
hosts = hosts.strip().split(',')
result = []
for host_port in hosts:
res = host_port.split(':')
host = res[0]
port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
result.append((host.strip(), port))
if randomize:
shuffle(result)
return result
class KafkaConnection(local): class KafkaConnection(local):
""" """
@@ -84,7 +110,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload) sent = self._sock.sendall(payload)
if sent is not None: if sent is not None:
self._raise_connection_error() self._raise_connection_error()
except socket.error, e: except socket.error:
log.exception('Unable to send payload to Kafka') log.exception('Unable to send payload to Kafka')
self._raise_connection_error() self._raise_connection_error()

View File

@@ -1,10 +1,10 @@
import os.path
import sys import sys
from setuptools import setup, Command from setuptools import setup, Command
class Tox(Command): class Tox(Command):
user_options = [] user_options = []
def initialize_options(self): def initialize_options(self):

View File

@@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name):
class KafkaTestCase(unittest.TestCase): class KafkaTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
ensure_topic_creation(self.client, self.topic) ensure_topic_creation(self.client, self.topic)
@@ -42,7 +42,7 @@ class TestKafkaClient(KafkaTestCase):
def setUpClass(cls): # noqa def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance() cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server.host, cls.server.port) cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
@classmethod @classmethod
def tearDownClass(cls): # noqa def tearDownClass(cls): # noqa
@@ -578,7 +578,7 @@ class TestConsumer(KafkaTestCase):
cls.zk = ZookeeperFixture.instance() cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server2.host, cls.server2.port) cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port))
@classmethod @classmethod
def tearDownClass(cls): # noqa def tearDownClass(cls): # noqa
@@ -826,23 +826,26 @@ class TestConsumer(KafkaTestCase):
class TestFailover(KafkaTestCase): class TestFailover(KafkaTestCase):
def setUp(self): @classmethod
def setUpClass(cls): # noqa
zk_chroot = random_string(10) zk_chroot = random_string(10)
replicas = 2 replicas = 2
partitions = 2 partitions = 2
# mini zookeeper, 2 kafka brokers # mini zookeeper, 2 kafka brokers
self.zk = ZookeeperFixture.instance() cls.zk = ZookeeperFixture.instance()
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
super(TestFailover, self).setUp()
def tearDown(self): hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
self.client.close() cls.client = KafkaClient(hosts)
for broker in self.brokers:
@classmethod
def tearDownClass(cls):
cls.client.close()
for broker in cls.brokers:
broker.close() broker.close()
self.zk.close() cls.zk.close()
def test_switch_leader(self): def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0 key, topic, partition = random_string(5), self.topic, 0
@@ -918,7 +921,8 @@ class TestFailover(KafkaTestCase):
return broker return broker
def _count_messages(self, group, topic): def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port) hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = [] all_messages = []
for message in consumer: for message in consumer:

View File

@@ -3,11 +3,16 @@ import random
import struct import struct
import unittest import unittest
from mock import MagicMock, patch
from kafka import KafkaClient
from kafka.common import ( from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError, ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata OffsetAndMessage, BrokerMetadata, PartitionMetadata
) )
from kafka.common import KafkaUnavailableError
from kafka.codec import ( from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode, has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode snappy_encode, snappy_decode
@@ -405,7 +410,6 @@ class TestProtocol(unittest.TestCase):
def test_decode_offset_response(self): def test_decode_offset_response(self):
pass pass
@unittest.skip("Not Implemented") @unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self): def test_encode_offset_commit_request(self):
pass pass
@@ -423,5 +427,83 @@ class TestProtocol(unittest.TestCase):
pass pass
class TestKafkaClient(unittest.TestCase):
def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(
hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
def test_init_with_csv(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(
hosts='kafka01:9092,kafka02:9092,kafka03:9092')
self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)
def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'
mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock()
}
# inject KafkaConnection side effects
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
def mock_get_conn(host, port):
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
self.assertRaises(
KafkaUnavailableError,
client._send_broker_unaware_request,
1, 'fake request')
for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')
def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available'
mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock(),
('kafka03', 9092): MagicMock()
}
# inject KafkaConnection side effects
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
def mock_get_conn(host, port):
return mocked_conns[(host, port)]
# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
resp = client._send_broker_unaware_request(1, 'fake request')
self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()