Make external API consistently support python3 strings for topic.

This commit is contained in:
Space
2015-03-31 15:25:38 -07:00
parent 9fd0811917
commit 1c856e8400
11 changed files with 69 additions and 49 deletions

View File

@@ -258,12 +258,14 @@ class KafkaClient(object):
self.topic_partitions.clear() self.topic_partitions.clear()
def has_metadata_for_topic(self, topic): def has_metadata_for_topic(self, topic):
topic = kafka_bytestring(topic)
return ( return (
topic in self.topic_partitions topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0 and len(self.topic_partitions[topic]) > 0
) )
def get_partition_ids_for_topic(self, topic): def get_partition_ids_for_topic(self, topic):
topic = kafka_bytestring(topic)
if topic not in self.topic_partitions: if topic not in self.topic_partitions:
return [] return []
@@ -312,6 +314,7 @@ class KafkaClient(object):
Partition-level errors will also not be raised here Partition-level errors will also not be raised here
(a single partition w/o a leader, for example) (a single partition w/o a leader, for example)
""" """
topics = [kafka_bytestring(t) for t in topics]
resp = self.send_metadata_request(topics) resp = self.send_metadata_request(topics)
log.debug("Broker metadata: %s", resp.brokers) log.debug("Broker metadata: %s", resp.brokers)

View File

@@ -10,7 +10,7 @@ from kafka.common import (
UnknownTopicOrPartitionError, check_error UnknownTopicOrPartitionError, check_error
) )
from kafka.util import ReentrantTimer from kafka.util import kafka_bytestring, ReentrantTimer
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
@@ -44,8 +44,8 @@ class Consumer(object):
auto_commit_every_t=AUTO_COMMIT_INTERVAL): auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client self.client = client
self.topic = topic self.topic = kafka_bytestring(topic)
self.group = group self.group = None if group is None else kafka_bytestring(group)
self.client.load_metadata_for_topics(topic) self.client.load_metadata_for_topics(topic)
self.offsets = {} self.offsets = {}

View File

@@ -163,7 +163,7 @@ class MultiProcessConsumer(Consumer):
simple_consumer_options.pop('partitions', None) simple_consumer_options.pop('partitions', None)
options.update(simple_consumer_options) options.update(simple_consumer_options)
args = (client.copy(), group, topic, self.queue, args = (client.copy(), self.group, self.topic, self.queue,
self.size, self.events) self.size, self.events)
proc = Process(target=_mp_consume, args=args, kwargs=options) proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True proc.daemon = True

View File

@@ -17,6 +17,7 @@ from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError ProduceRequest, TopicAndPartition, UnsupportedCodecError
) )
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
log = logging.getLogger("kafka") log = logging.getLogger("kafka")
@@ -170,6 +171,7 @@ class Producer(object):
All messages produced via this method will set the message 'key' to Null All messages produced via this method will set the message 'key' to Null
""" """
topic = kafka_bytestring(topic)
return self._send_messages(topic, partition, *msg) return self._send_messages(topic, partition, *msg)
def _send_messages(self, topic, partition, *msg, **kwargs): def _send_messages(self, topic, partition, *msg, **kwargs):
@@ -183,6 +185,10 @@ class Producer(object):
if any(not isinstance(m, six.binary_type) for m in msg): if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type bytes") raise TypeError("all produce message payloads must be type bytes")
# Raise TypeError if topic is not encoded as bytes
if not isinstance(topic, six.binary_type):
raise TypeError("the topic must be type bytes")
# Raise TypeError if the key is not encoded as bytes # Raise TypeError if the key is not encoded as bytes
if key is not None and not isinstance(key, six.binary_type): if key is not None and not isinstance(key, six.binary_type):
raise TypeError("the key must be type bytes") raise TypeError("the key must be type bytes")

View File

@@ -3,6 +3,8 @@ from __future__ import absolute_import
import logging import logging
from kafka.partitioner import HashedPartitioner from kafka.partitioner import HashedPartitioner
from kafka.util import kafka_bytestring
from .base import ( from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL, Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT BATCH_SEND_MSG_COUNT
@@ -57,10 +59,12 @@ class KeyedProducer(Producer):
return partitioner.partition(key) return partitioner.partition(key)
def send_messages(self,topic,key,*msg): def send_messages(self,topic,key,*msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key) partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg,key=key) return self._send_messages(topic, partition, *msg,key=key)
def send(self, topic, key, msg): def send(self, topic, key, msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key) partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, msg, key=key) return self._send_messages(topic, partition, msg, key=key)

View File

@@ -117,21 +117,21 @@ class TestKafkaClient(unittest.TestCase):
] ]
topics = [ topics = [
TopicMetadata('topic_1', NO_ERROR, [ TopicMetadata(b'topic_1', NO_ERROR, [
PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR) PartitionMetadata(b'topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
]), ]),
TopicMetadata('topic_noleader', NO_ERROR, [ TopicMetadata(b'topic_noleader', NO_ERROR, [
PartitionMetadata('topic_noleader', 0, -1, [], [], PartitionMetadata(b'topic_noleader', 0, -1, [], [],
NO_LEADER), NO_LEADER),
PartitionMetadata('topic_noleader', 1, -1, [], [], PartitionMetadata(b'topic_noleader', 1, -1, [], [],
NO_LEADER), NO_LEADER),
]), ]),
TopicMetadata('topic_no_partitions', NO_LEADER, []), TopicMetadata(b'topic_no_partitions', NO_LEADER, []),
TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), TopicMetadata(b'topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_3', NO_ERROR, [ TopicMetadata(b'topic_3', NO_ERROR, [
PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR), PartitionMetadata(b'topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR), PartitionMetadata(b'topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR) PartitionMetadata(b'topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
]) ])
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -139,12 +139,12 @@ class TestKafkaClient(unittest.TestCase):
# client loads metadata at init # client loads metadata at init
client = KafkaClient(hosts=['broker_1:4567']) client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({ self.assertDictEqual({
TopicAndPartition('topic_1', 0): brokers[1], TopicAndPartition(b'topic_1', 0): brokers[1],
TopicAndPartition('topic_noleader', 0): None, TopicAndPartition(b'topic_noleader', 0): None,
TopicAndPartition('topic_noleader', 1): None, TopicAndPartition(b'topic_noleader', 1): None,
TopicAndPartition('topic_3', 0): brokers[0], TopicAndPartition(b'topic_3', 0): brokers[0],
TopicAndPartition('topic_3', 1): brokers[1], TopicAndPartition(b'topic_3', 1): brokers[1],
TopicAndPartition('topic_3', 2): brokers[0]}, TopicAndPartition(b'topic_3', 2): brokers[0]},
client.topics_to_brokers) client.topics_to_brokers)
# if we ask for metadata explicitly, it should raise errors # if we ask for metadata explicitly, it should raise errors
@@ -156,6 +156,7 @@ class TestKafkaClient(unittest.TestCase):
# This should not raise # This should not raise
client.load_metadata_for_topics('topic_no_leader') client.load_metadata_for_topics('topic_no_leader')
client.load_metadata_for_topics(b'topic_no_leader')
@patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
@@ -169,11 +170,11 @@ class TestKafkaClient(unittest.TestCase):
] ]
topics = [ topics = [
TopicMetadata('topic_still_creating', NO_LEADER, []), TopicMetadata(b'topic_still_creating', NO_LEADER, []),
TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), TopicMetadata(b'topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_noleaders', NO_ERROR, [ TopicMetadata(b'topic_noleaders', NO_ERROR, [
PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), PartitionMetadata(b'topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), PartitionMetadata(b'topic_noleaders', 1, -1, [], [], NO_LEADER),
]), ]),
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -188,8 +189,8 @@ class TestKafkaClient(unittest.TestCase):
self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
@patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol.decode_metadata_response')
def test_ensure_topic_exists(self, protocol, conn): def test_ensure_topic_exists(self, decode_metadata_response, conn):
conn.recv.return_value = 'response' # anything but None conn.recv.return_value = 'response' # anything but None
@@ -199,14 +200,14 @@ class TestKafkaClient(unittest.TestCase):
] ]
topics = [ topics = [
TopicMetadata('topic_still_creating', NO_LEADER, []), TopicMetadata(b'topic_still_creating', NO_LEADER, []),
TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), TopicMetadata(b'topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_noleaders', NO_ERROR, [ TopicMetadata(b'topic_noleaders', NO_ERROR, [
PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), PartitionMetadata(b'topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), PartitionMetadata(b'topic_noleaders', 1, -1, [], [], NO_LEADER),
]), ]),
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567']) client = KafkaClient(hosts=['broker_1:4567'])
@@ -218,6 +219,7 @@ class TestKafkaClient(unittest.TestCase):
# This should not raise # This should not raise
client.ensure_topic_exists('topic_noleaders', timeout=1) client.ensure_topic_exists('topic_noleaders', timeout=1)
client.ensure_topic_exists(b'topic_noleaders', timeout=1)
@patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
@@ -269,8 +271,8 @@ class TestKafkaClient(unittest.TestCase):
] ]
topics = [ topics = [
TopicMetadata('topic_no_partitions', NO_LEADER, []), TopicMetadata(b'topic_no_partitions', NO_LEADER, []),
TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), TopicMetadata(b'topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
] ]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -279,10 +281,10 @@ class TestKafkaClient(unittest.TestCase):
self.assertDictEqual({}, client.topics_to_brokers) self.assertDictEqual({}, client.topics_to_brokers)
with self.assertRaises(LeaderNotAvailableError): with self.assertRaises(LeaderNotAvailableError):
client._get_leader_for_partition('topic_no_partitions', 0) client._get_leader_for_partition(b'topic_no_partitions', 0)
with self.assertRaises(UnknownTopicOrPartitionError): with self.assertRaises(UnknownTopicOrPartitionError):
client._get_leader_for_partition('topic_unknown', 0) client._get_leader_for_partition(b'topic_unknown', 0)
@patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')

View File

@@ -29,11 +29,11 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@kafka_versions("all") @kafka_versions("all")
def test_consume_none(self): def test_consume_none(self):
fetch = FetchRequest(self.topic, 0, 0, 1024) fetch = FetchRequest(self.bytes_topic, 0, 0, 1024)
fetch_resp, = self.client.send_fetch_request([fetch]) fetch_resp, = self.client.send_fetch_request([fetch])
self.assertEqual(fetch_resp.error, 0) self.assertEqual(fetch_resp.error, 0)
self.assertEqual(fetch_resp.topic, self.topic) self.assertEqual(fetch_resp.topic, self.bytes_topic)
self.assertEqual(fetch_resp.partition, 0) self.assertEqual(fetch_resp.partition, 0)
messages = list(fetch_resp.messages) messages = list(fetch_resp.messages)
@@ -56,11 +56,11 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
def test_commit_fetch_offsets(self): def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.topic, 0, 42, b"metadata") req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req]) (resp,) = self.client.send_offset_commit_request(b"group", [req])
self.assertEqual(resp.error, 0) self.assertEqual(resp.error, 0)
req = OffsetFetchRequest(self.topic, 0) req = OffsetFetchRequest(self.bytes_topic, 0)
(resp,) = self.client.send_offset_fetch_request(b"group", [req]) (resp,) = self.client.send_offset_fetch_request(b"group", [req])
self.assertEqual(resp.error, 0) self.assertEqual(resp.error, 0)
self.assertEqual(resp.offset, 42) self.assertEqual(resp.offset, 42)

View File

@@ -37,7 +37,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def send_messages(self, partition, messages): def send_messages(self, partition, messages):
messages = [ create_message(self.msg(str(msg))) for msg in messages ] messages = [ create_message(self.msg(str(msg))) for msg in messages ]
produce = ProduceRequest(self.topic, partition, messages = messages) produce = ProduceRequest(self.bytes_topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce]) resp, = self.client.send_produce_request([produce])
self.assertEqual(resp.error, 0) self.assertEqual(resp.error, 0)

View File

@@ -8,6 +8,7 @@ from kafka import KafkaClient, SimpleConsumer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer from kafka.producer.base import Producer
from kafka.producer import KeyedProducer from kafka.producer import KeyedProducer
from kafka.util import kafka_bytestring
from test.fixtures import ZookeeperFixture, KafkaFixture from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import ( from test.testutil import (
@@ -147,7 +148,7 @@ class TestFailover(KafkaIntegrationTestCase):
key = random_string(3) key = random_string(3)
msg = random_string(10) msg = random_string(10)
producer.send_messages(topic, key, msg) producer.send_messages(topic, key, msg)
if producer.partitioners[topic].partition(key) == 0: if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0:
recovered = True recovered = True
except (FailedPayloadsError, ConnectionError): except (FailedPayloadsError, ConnectionError):
logging.debug("caught exception sending message -- will retry") logging.debug("caught exception sending message -- will retry")
@@ -172,7 +173,7 @@ class TestFailover(KafkaIntegrationTestCase):
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j) logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
def _kill_leader(self, topic, partition): def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)]
broker = self.brokers[leader.nodeId] broker = self.brokers[leader.nodeId]
broker.close() broker.close()
return broker return broker

View File

@@ -453,7 +453,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def assert_produce_request(self, messages, initial_offset, message_ct, def assert_produce_request(self, messages, initial_offset, message_ct,
partition=0): partition=0):
produce = ProduceRequest(self.topic, partition, messages=messages) produce = ProduceRequest(self.bytes_topic, partition, messages=messages)
# There should only be one response message from the server. # There should only be one response message from the server.
# This will throw an exception if there's more than one. # This will throw an exception if there's more than one.
@@ -471,7 +471,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# There should only be one response message from the server. # There should only be one response message from the server.
# This will throw an exception if there's more than one. # This will throw an exception if there's more than one.
resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ]) resp, = self.client.send_fetch_request([ FetchRequest(self.bytes_topic, partition, start_offset, 1024) ])
self.assertEqual(resp.error, 0) self.assertEqual(resp.error, 0)
self.assertEqual(resp.partition, partition) self.assertEqual(resp.partition, partition)

View File

@@ -12,6 +12,7 @@ from . import unittest
from kafka import KafkaClient from kafka import KafkaClient
from kafka.common import OffsetRequest from kafka.common import OffsetRequest
from kafka.util import kafka_bytestring
__all__ = [ __all__ = [
'random_string', 'random_string',
@@ -50,6 +51,7 @@ def get_open_port():
class KafkaIntegrationTestCase(unittest.TestCase): class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True create_client = True
topic = None topic = None
bytes_topic = None
server = None server = None
def setUp(self): def setUp(self):
@@ -59,7 +61,8 @@ class KafkaIntegrationTestCase(unittest.TestCase):
if not self.topic: if not self.topic:
topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10).decode('utf-8')) topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10).decode('utf-8'))
self.topic = topic.encode('utf-8') self.topic = topic
self.bytes_topic = topic.encode('utf-8')
if self.create_client: if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port)) self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
@@ -77,7 +80,8 @@ class KafkaIntegrationTestCase(unittest.TestCase):
self.client.close() self.client.close()
def current_offset(self, topic, partition): def current_offset(self, topic, partition):
offsets, = self.client.send_offset_request([ OffsetRequest(topic, partition, -1, 1) ]) offsets, = self.client.send_offset_request([ OffsetRequest(kafka_bytestring(topic),
partition, -1, 1) ])
return offsets.offsets[0] return offsets.offsets[0]
def msgs(self, iterable): def msgs(self, iterable):