Drop kafka_bytestring

This commit is contained in:
Dana Powers
2015-12-10 16:24:32 -08:00
parent 7a80422494
commit d54980a2cd
14 changed files with 47 additions and 81 deletions

View File

@@ -17,7 +17,6 @@ from kafka.common import (TopicAndPartition, BrokerMetadata, UnknownError,
from kafka.conn import collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.conn import collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol from kafka.protocol import KafkaProtocol
from kafka.util import kafka_bytestring
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -212,7 +211,7 @@ class KafkaClient(object):
failed_payloads(broker_payloads) failed_payloads(broker_payloads)
continue continue
conn = self._get_conn(broker.host.decode('utf-8'), broker.port) conn = self._get_conn(broker.host, broker.port)
request = encoder_fn(payloads=broker_payloads) request = encoder_fn(payloads=broker_payloads)
# decoder_fn=None signal that the server is expected to not # decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to # send a response. This probably only applies to
@@ -305,7 +304,7 @@ class KafkaClient(object):
# Send the request, recv the response # Send the request, recv the response
try: try:
conn = self._get_conn(broker.host.decode('utf-8'), broker.port) conn = self._get_conn(broker.host, broker.port)
conn.send(requestId, request) conn.send(requestId, request)
except ConnectionError as e: except ConnectionError as e:
@@ -410,14 +409,12 @@ class KafkaClient(object):
self.topic_partitions.clear() self.topic_partitions.clear()
def has_metadata_for_topic(self, topic): def has_metadata_for_topic(self, topic):
topic = kafka_bytestring(topic)
return ( return (
topic in self.topic_partitions topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0 and len(self.topic_partitions[topic]) > 0
) )
def get_partition_ids_for_topic(self, topic): def get_partition_ids_for_topic(self, topic):
topic = kafka_bytestring(topic)
if topic not in self.topic_partitions: if topic not in self.topic_partitions:
return [] return []

View File

@@ -11,7 +11,7 @@ from kafka.common import (
UnknownTopicOrPartitionError, check_error, KafkaError UnknownTopicOrPartitionError, check_error, KafkaError
) )
from kafka.util import kafka_bytestring, ReentrantTimer from kafka.util import ReentrantTimer
log = logging.getLogger('kafka.consumer') log = logging.getLogger('kafka.consumer')
@@ -47,8 +47,8 @@ class Consumer(object):
auto_commit_every_t=AUTO_COMMIT_INTERVAL): auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client self.client = client
self.topic = kafka_bytestring(topic) self.topic = topic
self.group = None if group is None else kafka_bytestring(group) self.group = group
self.client.load_metadata_for_topics(topic) self.client.load_metadata_for_topics(topic)
self.offsets = {} self.offsets = {}

View File

@@ -16,7 +16,6 @@ from kafka.common import (
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
) )
from kafka.util import kafka_bytestring
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -307,13 +306,13 @@ class KafkaConsumer(object):
# Topic name str -- all partitions # Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)): if isinstance(arg, (six.string_types, six.binary_type)):
topic = kafka_bytestring(arg) topic = arg
for partition in self._cluster.partitions_for_topic(topic): for partition in self._cluster.partitions_for_topic(topic):
self._consume_topic_partition(topic, partition) self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple # (topic, partition [, offset]) tuple
elif isinstance(arg, tuple): elif isinstance(arg, tuple):
topic = kafka_bytestring(arg[0]) topic = arg[0]
partition = arg[1] partition = arg[1]
self._consume_topic_partition(topic, partition) self._consume_topic_partition(topic, partition)
if len(arg) == 3: if len(arg) == 3:
@@ -326,7 +325,7 @@ class KafkaConsumer(object):
# key can be string (a topic) # key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)): if isinstance(key, (six.string_types, six.binary_type)):
topic = kafka_bytestring(key) topic = key
# topic: partition # topic: partition
if isinstance(value, int): if isinstance(value, int):
@@ -344,7 +343,7 @@ class KafkaConsumer(object):
# (topic, partition): offset # (topic, partition): offset
elif isinstance(key, tuple): elif isinstance(key, tuple):
topic = kafka_bytestring(key[0]) topic = key[0]
partition = key[1] partition = key[1]
self._consume_topic_partition(topic, partition) self._consume_topic_partition(topic, partition)
self._offsets.fetch[(topic, partition)] = value self._offsets.fetch[(topic, partition)] = value
@@ -463,7 +462,7 @@ class KafkaConsumer(object):
self._refresh_metadata_on_error() self._refresh_metadata_on_error()
continue continue
topic = kafka_bytestring(resp.topic) topic = resp.topic
partition = resp.partition partition = resp.partition
try: try:
check_error(resp) check_error(resp)
@@ -662,7 +661,7 @@ class KafkaConsumer(object):
if commits: if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id']) logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request( resps = self._client.send_offset_commit_request(
kafka_bytestring(self._config['group_id']), commits, self._config['group_id'], commits,
fail_on_error=False fail_on_error=False
) )
@@ -725,7 +724,7 @@ class KafkaConsumer(object):
logger.info("Consumer fetching stored offsets") logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics: for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request( (resp,) = self._client.send_offset_fetch_request(
kafka_bytestring(self._config['group_id']), self._config['group_id'],
[OffsetFetchRequest(topic_partition[0], topic_partition[1])], [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False) fail_on_error=False)
try: try:

View File

@@ -17,7 +17,6 @@ from kafka.common import (
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
) )
from kafka.util import kafka_bytestring
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -193,14 +192,14 @@ class KafkaConsumer(object):
# Topic name str -- all partitions # Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)): if isinstance(arg, (six.string_types, six.binary_type)):
topic = kafka_bytestring(arg) topic = arg
for partition in self._client.get_partition_ids_for_topic(topic): for partition in self._client.get_partition_ids_for_topic(topic):
self._consume_topic_partition(topic, partition) self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple # (topic, partition [, offset]) tuple
elif isinstance(arg, tuple): elif isinstance(arg, tuple):
topic = kafka_bytestring(arg[0]) topic = arg[0]
partition = arg[1] partition = arg[1]
self._consume_topic_partition(topic, partition) self._consume_topic_partition(topic, partition)
if len(arg) == 3: if len(arg) == 3:
@@ -213,7 +212,7 @@ class KafkaConsumer(object):
# key can be string (a topic) # key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)): if isinstance(key, (six.string_types, six.binary_type)):
topic = kafka_bytestring(key) topic = key
# topic: partition # topic: partition
if isinstance(value, int): if isinstance(value, int):
@@ -231,7 +230,7 @@ class KafkaConsumer(object):
# (topic, partition): offset # (topic, partition): offset
elif isinstance(key, tuple): elif isinstance(key, tuple):
topic = kafka_bytestring(key[0]) topic = key[0]
partition = key[1] partition = key[1]
self._consume_topic_partition(topic, partition) self._consume_topic_partition(topic, partition)
self._offsets.fetch[(topic, partition)] = value self._offsets.fetch[(topic, partition)] = value
@@ -354,7 +353,7 @@ class KafkaConsumer(object):
self._refresh_metadata_on_error() self._refresh_metadata_on_error()
continue continue
topic = kafka_bytestring(resp.topic) topic = resp.topic
partition = resp.partition partition = resp.partition
try: try:
check_error(resp) check_error(resp)
@@ -553,7 +552,7 @@ class KafkaConsumer(object):
if commits: if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id']) logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request( resps = self._client.send_offset_commit_request(
kafka_bytestring(self._config['group_id']), commits, self._config['group_id'], commits,
fail_on_error=False fail_on_error=False
) )
@@ -577,7 +576,6 @@ class KafkaConsumer(object):
# #
def _consume_topic_partition(self, topic, partition): def _consume_topic_partition(self, topic, partition):
topic = kafka_bytestring(topic)
if not isinstance(partition, int): if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) ' raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition)) '-- expected int' % type(partition))
@@ -617,7 +615,7 @@ class KafkaConsumer(object):
logger.info("Consumer fetching stored offsets") logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics: for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request( (resp,) = self._client.send_offset_fetch_request(
kafka_bytestring(self._config['group_id']), self._config['group_id'],
[OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])], [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])],
fail_on_error=False) fail_on_error=False)
try: try:

View File

@@ -22,7 +22,6 @@ from kafka.common import (
) )
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
log = logging.getLogger('kafka.producer') log = logging.getLogger('kafka.producer')
@@ -361,7 +360,6 @@ class Producer(object):
All messages produced via this method will set the message 'key' to Null All messages produced via this method will set the message 'key' to Null
""" """
topic = kafka_bytestring(topic)
return self._send_messages(topic, partition, *msg) return self._send_messages(topic, partition, *msg)
def _send_messages(self, topic, partition, *msg, **kwargs): def _send_messages(self, topic, partition, *msg, **kwargs):
@@ -381,10 +379,6 @@ class Producer(object):
elif not isinstance(m, six.binary_type): elif not isinstance(m, six.binary_type):
raise TypeError("all produce message payloads must be null or type bytes") raise TypeError("all produce message payloads must be null or type bytes")
# Raise TypeError if topic is not encoded as bytes
if not isinstance(topic, six.binary_type):
raise TypeError("the topic must be type bytes")
# Raise TypeError if the key is not encoded as bytes # Raise TypeError if the key is not encoded as bytes
if key is not None and not isinstance(key, six.binary_type): if key is not None and not isinstance(key, six.binary_type):
raise TypeError("the key must be type bytes") raise TypeError("the key must be type bytes")

View File

@@ -5,7 +5,6 @@ import warnings
from .base import Producer from .base import Producer
from ..partitioner import HashedPartitioner from ..partitioner import HashedPartitioner
from ..util import kafka_bytestring
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -38,7 +37,6 @@ class KeyedProducer(Producer):
return partitioner.partition(key) return partitioner.partition(key)
def send_messages(self, topic, key, *msg): def send_messages(self, topic, key, *msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key) partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg, key=key) return self._send_messages(topic, partition, *msg, key=key)

View File

@@ -46,9 +46,6 @@ class SimpleProducer(Producer):
return next(self.partition_cycles[topic]) return next(self.partition_cycles[topic])
def send_messages(self, topic, *msg): def send_messages(self, topic, *msg):
if not isinstance(topic, six.binary_type):
topic = topic.encode('utf-8')
partition = self._next_partition(topic) partition = self._next_partition(topic)
return super(SimpleProducer, self).send_messages( return super(SimpleProducer, self).send_messages(
topic, partition, *msg topic, partition, *msg

View File

@@ -95,18 +95,6 @@ def group_by_topic_and_partition(tuples):
return out return out
def kafka_bytestring(s):
"""
Takes a string or bytes instance
Returns bytes, encoding strings in utf-8 as necessary
"""
if isinstance(s, six.binary_type):
return s
if isinstance(s, six.string_types):
return s.encode('utf-8')
raise TypeError(s)
class ReentrantTimer(object): class ReentrantTimer(object):
""" """
A timer that can be restarted, unlike threading.Timer A timer that can be restarted, unlike threading.Timer

View File

@@ -133,12 +133,12 @@ class TestKafkaClient(unittest.TestCase):
# client loads metadata at init # client loads metadata at init
client = KafkaClient(hosts=['broker_1:4567']) client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({ self.assertDictEqual({
TopicAndPartition(b'topic_1', 0): brokers[1], TopicAndPartition('topic_1', 0): brokers[1],
TopicAndPartition(b'topic_noleader', 0): None, TopicAndPartition('topic_noleader', 0): None,
TopicAndPartition(b'topic_noleader', 1): None, TopicAndPartition('topic_noleader', 1): None,
TopicAndPartition(b'topic_3', 0): brokers[0], TopicAndPartition('topic_3', 0): brokers[0],
TopicAndPartition(b'topic_3', 1): brokers[1], TopicAndPartition('topic_3', 1): brokers[1],
TopicAndPartition(b'topic_3', 2): brokers[0]}, TopicAndPartition('topic_3', 2): brokers[0]},
client.topics_to_brokers) client.topics_to_brokers)
# if we ask for metadata explicitly, it should raise errors # if we ask for metadata explicitly, it should raise errors
@@ -150,7 +150,6 @@ class TestKafkaClient(unittest.TestCase):
# This should not raise # This should not raise
client.load_metadata_for_topics('topic_no_leader') client.load_metadata_for_topics('topic_no_leader')
client.load_metadata_for_topics(b'topic_no_leader')
@patch('kafka.client.BrokerConnection') @patch('kafka.client.BrokerConnection')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')
@@ -274,10 +273,10 @@ class TestKafkaClient(unittest.TestCase):
self.assertDictEqual({}, client.topics_to_brokers) self.assertDictEqual({}, client.topics_to_brokers)
with self.assertRaises(LeaderNotAvailableError): with self.assertRaises(LeaderNotAvailableError):
client._get_leader_for_partition(b'topic_no_partitions', 0) client._get_leader_for_partition('topic_no_partitions', 0)
with self.assertRaises(UnknownTopicOrPartitionError): with self.assertRaises(UnknownTopicOrPartitionError):
client._get_leader_for_partition(b'topic_unknown', 0) client._get_leader_for_partition('topic_unknown', 0)
@patch('kafka.client.BrokerConnection') @patch('kafka.client.BrokerConnection')
@patch('kafka.client.KafkaProtocol') @patch('kafka.client.KafkaProtocol')

View File

@@ -29,11 +29,11 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@kafka_versions("all") @kafka_versions("all")
def test_consume_none(self): def test_consume_none(self):
fetch = FetchRequestPayload(self.bytes_topic, 0, 0, 1024) fetch = FetchRequestPayload(self.topic, 0, 0, 1024)
fetch_resp, = self.client.send_fetch_request([fetch]) fetch_resp, = self.client.send_fetch_request([fetch])
self.assertEqual(fetch_resp.error, 0) self.assertEqual(fetch_resp.error, 0)
self.assertEqual(fetch_resp.topic, self.bytes_topic) self.assertEqual(fetch_resp.topic, self.topic)
self.assertEqual(fetch_resp.partition, 0) self.assertEqual(fetch_resp.partition, 0)
messages = list(fetch_resp.messages) messages = list(fetch_resp.messages)
@@ -48,26 +48,26 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
# ensure_topic_exists should fail with KafkaTimeoutError # ensure_topic_exists should fail with KafkaTimeoutError
with self.assertRaises(KafkaTimeoutError): with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0) self.client.ensure_topic_exists('this_topic_doesnt_exist', timeout=0)
@kafka_versions('all') @kafka_versions('all')
def test_send_produce_request_maintains_request_response_order(self): def test_send_produce_request_maintains_request_response_order(self):
self.client.ensure_topic_exists(b'foo') self.client.ensure_topic_exists('foo')
self.client.ensure_topic_exists(b'bar') self.client.ensure_topic_exists('bar')
requests = [ requests = [
ProduceRequestPayload( ProduceRequestPayload(
b'foo', 0, 'foo', 0,
[create_message(b'a'), create_message(b'b')]), [create_message(b'a'), create_message(b'b')]),
ProduceRequestPayload( ProduceRequestPayload(
b'bar', 1, 'bar', 1,
[create_message(b'a'), create_message(b'b')]), [create_message(b'a'), create_message(b'b')]),
ProduceRequestPayload( ProduceRequestPayload(
b'foo', 1, 'foo', 1,
[create_message(b'a'), create_message(b'b')]), [create_message(b'a'), create_message(b'b')]),
ProduceRequestPayload( ProduceRequestPayload(
b'bar', 0, 'bar', 0,
[create_message(b'a'), create_message(b'b')]), [create_message(b'a'), create_message(b'b')]),
] ]
@@ -85,12 +85,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
def test_commit_fetch_offsets(self): def test_commit_fetch_offsets(self):
req = OffsetCommitRequestPayload(self.bytes_topic, 0, 42, b"metadata") req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata')
(resp,) = self.client.send_offset_commit_request(b"group", [req]) (resp,) = self.client.send_offset_commit_request('group', [req])
self.assertEqual(resp.error, 0) self.assertEqual(resp.error, 0)
req = OffsetFetchRequestPayload(self.bytes_topic, 0) req = OffsetFetchRequestPayload(self.topic, 0)
(resp,) = self.client.send_offset_fetch_request(b"group", [req]) (resp,) = self.client.send_offset_fetch_request('group', [req])
self.assertEqual(resp.error, 0) self.assertEqual(resp.error, 0)
self.assertEqual(resp.offset, 42) self.assertEqual(resp.offset, 42)
self.assertEqual(resp.metadata, b"") # Metadata isn't stored for now self.assertEqual(resp.metadata, '') # Metadata isn't stored for now

View File

@@ -41,7 +41,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def send_messages(self, partition, messages): def send_messages(self, partition, messages):
messages = [ create_message(self.msg(str(msg))) for msg in messages ] messages = [ create_message(self.msg(str(msg))) for msg in messages ]
produce = ProduceRequestPayload(self.bytes_topic, partition, messages = messages) produce = ProduceRequestPayload(self.topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce]) resp, = self.client.send_produce_request([produce])
self.assertEqual(resp.error, 0) self.assertEqual(resp.error, 0)

View File

@@ -5,7 +5,6 @@ import time
from kafka import KafkaClient, SimpleConsumer, KeyedProducer from kafka import KafkaClient, SimpleConsumer, KeyedProducer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer from kafka.producer.base import Producer
from kafka.util import kafka_bytestring
from test.fixtures import ZookeeperFixture, KafkaFixture from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import ( from test.testutil import (
@@ -165,7 +164,7 @@ class TestFailover(KafkaIntegrationTestCase):
key = random_string(3).encode('utf-8') key = random_string(3).encode('utf-8')
msg = random_string(10).encode('utf-8') msg = random_string(10).encode('utf-8')
producer.send_messages(topic, key, msg) producer.send_messages(topic, key, msg)
if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0: if producer.partitioners[topic].partition(key) == 0:
recovered = True recovered = True
except (FailedPayloadsError, ConnectionError): except (FailedPayloadsError, ConnectionError):
log.debug("caught exception sending message -- will retry") log.debug("caught exception sending message -- will retry")
@@ -203,7 +202,7 @@ class TestFailover(KafkaIntegrationTestCase):
break break
def _kill_leader(self, topic, partition): def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)] leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId] broker = self.brokers[leader.nodeId]
broker.close() broker.close()
return broker return broker

View File

@@ -488,7 +488,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def assert_produce_request(self, messages, initial_offset, message_ct, def assert_produce_request(self, messages, initial_offset, message_ct,
partition=0): partition=0):
produce = ProduceRequestPayload(self.bytes_topic, partition, messages=messages) produce = ProduceRequestPayload(self.topic, partition, messages=messages)
# There should only be one response message from the server. # There should only be one response message from the server.
# This will throw an exception if there's more than one. # This will throw an exception if there's more than one.
@@ -506,7 +506,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# There should only be one response message from the server. # There should only be one response message from the server.
# This will throw an exception if there's more than one. # This will throw an exception if there's more than one.
resp, = self.client.send_fetch_request([FetchRequestPayload(self.bytes_topic, partition, start_offset, 1024)]) resp, = self.client.send_fetch_request([FetchRequestPayload(self.topic, partition, start_offset, 1024)])
self.assertEqual(resp.error, 0) self.assertEqual(resp.error, 0)
self.assertEqual(resp.partition, partition) self.assertEqual(resp.partition, partition)

View File

@@ -12,7 +12,6 @@ from . import unittest
from kafka import KafkaClient from kafka import KafkaClient
from kafka.common import OffsetRequestPayload from kafka.common import OffsetRequestPayload
from kafka.util import kafka_bytestring
__all__ = [ __all__ = [
'random_string', 'random_string',
@@ -50,7 +49,6 @@ def get_open_port():
class KafkaIntegrationTestCase(unittest.TestCase): class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True create_client = True
topic = None topic = None
bytes_topic = None
zk = None zk = None
server = None server = None
@@ -62,7 +60,6 @@ class KafkaIntegrationTestCase(unittest.TestCase):
if not self.topic: if not self.topic:
topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
self.topic = topic self.topic = topic
self.bytes_topic = topic.encode('utf-8')
if self.create_client: if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port)) self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
@@ -81,7 +78,7 @@ class KafkaIntegrationTestCase(unittest.TestCase):
def current_offset(self, topic, partition): def current_offset(self, topic, partition):
try: try:
offsets, = self.client.send_offset_request([OffsetRequestPayload(kafka_bytestring(topic), partition, -1, 1)]) offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, partition, -1, 1)])
except: except:
# XXX: We've seen some UnknownErrors here and cant debug w/o server logs # XXX: We've seen some UnknownErrors here and cant debug w/o server logs
self.zk.child.dump_logs() self.zk.child.dump_logs()