Add KafkaTimeoutError (used by client.ensure_topic_exists) and add a test

This commit is contained in:
Dana Powers
2014-08-13 11:51:48 -07:00
parent 5a02d6393d
commit 18fdddf37f
3 changed files with 18 additions and 3 deletions

View File

@@ -8,8 +8,8 @@ import kafka.common
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError,
LeaderUnavailableError, KafkaUnavailableError,
PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError,
KafkaTimeoutError,
UnknownTopicOrPartitionError, NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
@@ -219,7 +219,7 @@ class KafkaClient(object):
self.load_metadata_for_topics(topic)
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError("Unable to create topic {}".format(topic))
raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
self.load_metadata_for_topics(topic)
time.sleep(.5)

View File

@@ -135,6 +135,10 @@ class KafkaUnavailableError(KafkaError):
pass
class KafkaTimeoutError(KafkaError):
pass
class LeaderUnavailableError(KafkaError):
pass

View File

@@ -49,6 +49,17 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
messages = list(fetch_resp.messages)
self.assertEquals(len(messages), 0)
@kafka_versions("all")
def test_ensure_topic_exists(self):
# assume that self.topic was created by setUp
# if so, this should succeed
self.client.ensure_topic_exists(self.topic, timeout=1)
# ensure_topic_exists should fail with KafkaTimeoutError
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists("this_topic_doesnt_exist", timeout=0)
####################
# Offset Tests #
####################