Merge pull request #174 from wizzat/new_topic
Handle New Topic Creation
This commit is contained in:
@@ -1,11 +1,11 @@
|
|||||||
import copy
|
|
||||||
import logging
|
|
||||||
import collections
|
import collections
|
||||||
|
import copy
|
||||||
|
import functools
|
||||||
|
import itertools
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
import kafka.common
|
import kafka.common
|
||||||
|
|
||||||
from functools import partial
|
|
||||||
from itertools import count
|
|
||||||
from kafka.common import (TopicAndPartition,
|
from kafka.common import (TopicAndPartition,
|
||||||
ConnectionError, FailedPayloadsError,
|
ConnectionError, FailedPayloadsError,
|
||||||
PartitionUnavailableError,
|
PartitionUnavailableError,
|
||||||
@@ -21,7 +21,7 @@ log = logging.getLogger("kafka")
|
|||||||
class KafkaClient(object):
|
class KafkaClient(object):
|
||||||
|
|
||||||
CLIENT_ID = "kafka-python"
|
CLIENT_ID = "kafka-python"
|
||||||
ID_GEN = count()
|
ID_GEN = itertools.count()
|
||||||
|
|
||||||
# NOTE: The timeout given to the client should always be greater than the
|
# NOTE: The timeout given to the client should always be greater than the
|
||||||
# one passed to SimpleConsumer.get_message(), otherwise you can get a
|
# one passed to SimpleConsumer.get_message(), otherwise you can get a
|
||||||
@@ -213,6 +213,16 @@ class KafkaClient(object):
|
|||||||
def has_metadata_for_topic(self, topic):
|
def has_metadata_for_topic(self, topic):
|
||||||
return topic in self.topic_partitions
|
return topic in self.topic_partitions
|
||||||
|
|
||||||
|
def ensure_topic_exists(self, topic, timeout = 30):
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
self.load_metadata_for_topics(topic)
|
||||||
|
while not self.has_metadata_for_topic(topic):
|
||||||
|
if time.time() > start_time + timeout:
|
||||||
|
raise KafkaTimeoutError("Unable to create topic {}".format(topic))
|
||||||
|
self.load_metadata_for_topics(topic)
|
||||||
|
time.sleep(.5)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
for conn in self.conns.values():
|
for conn in self.conns.values():
|
||||||
conn.close()
|
conn.close()
|
||||||
@@ -289,7 +299,7 @@ class KafkaClient(object):
|
|||||||
order of input payloads
|
order of input payloads
|
||||||
"""
|
"""
|
||||||
|
|
||||||
encoder = partial(
|
encoder = functools.partial(
|
||||||
KafkaProtocol.encode_produce_request,
|
KafkaProtocol.encode_produce_request,
|
||||||
acks=acks,
|
acks=acks,
|
||||||
timeout=timeout)
|
timeout=timeout)
|
||||||
@@ -321,7 +331,7 @@ class KafkaClient(object):
|
|||||||
to the same brokers.
|
to the same brokers.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
encoder = partial(KafkaProtocol.encode_fetch_request,
|
encoder = functools.partial(KafkaProtocol.encode_fetch_request,
|
||||||
max_wait_time=max_wait_time,
|
max_wait_time=max_wait_time,
|
||||||
min_bytes=min_bytes)
|
min_bytes=min_bytes)
|
||||||
|
|
||||||
@@ -359,7 +369,7 @@ class KafkaClient(object):
|
|||||||
|
|
||||||
def send_offset_commit_request(self, group, payloads=[],
|
def send_offset_commit_request(self, group, payloads=[],
|
||||||
fail_on_error=True, callback=None):
|
fail_on_error=True, callback=None):
|
||||||
encoder = partial(KafkaProtocol.encode_offset_commit_request,
|
encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
|
||||||
group=group)
|
group=group)
|
||||||
decoder = KafkaProtocol.decode_offset_commit_response
|
decoder = KafkaProtocol.decode_offset_commit_response
|
||||||
resps = self._send_broker_aware_request(payloads, encoder, decoder)
|
resps = self._send_broker_aware_request(payloads, encoder, decoder)
|
||||||
@@ -378,7 +388,7 @@ class KafkaClient(object):
|
|||||||
def send_offset_fetch_request(self, group, payloads=[],
|
def send_offset_fetch_request(self, group, payloads=[],
|
||||||
fail_on_error=True, callback=None):
|
fail_on_error=True, callback=None):
|
||||||
|
|
||||||
encoder = partial(KafkaProtocol.encode_offset_fetch_request,
|
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
|
||||||
group=group)
|
group=group)
|
||||||
decoder = KafkaProtocol.decode_offset_fetch_response
|
decoder = KafkaProtocol.decode_offset_fetch_response
|
||||||
resps = self._send_broker_aware_request(payloads, encoder, decoder)
|
resps = self._send_broker_aware_request(payloads, encoder, decoder)
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from itertools import cycle
|
|||||||
from multiprocessing import Queue, Process
|
from multiprocessing import Queue, Process
|
||||||
|
|
||||||
from kafka.common import (
|
from kafka.common import (
|
||||||
ProduceRequest, TopicAndPartition, UnsupportedCodecError
|
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
|
||||||
)
|
)
|
||||||
from kafka.partitioner import HashedPartitioner
|
from kafka.partitioner import HashedPartitioner
|
||||||
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
|
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
|
||||||
@@ -216,7 +216,10 @@ class SimpleProducer(Producer):
|
|||||||
if topic not in self.partition_cycles:
|
if topic not in self.partition_cycles:
|
||||||
if topic not in self.client.topic_partitions:
|
if topic not in self.client.topic_partitions:
|
||||||
self.client.load_metadata_for_topics(topic)
|
self.client.load_metadata_for_topics(topic)
|
||||||
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
|
try:
|
||||||
|
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
|
||||||
|
except KeyError:
|
||||||
|
raise UnknownTopicOrPartitionError(topic)
|
||||||
|
|
||||||
# Randomize the initial partition that is returned
|
# Randomize the initial partition that is returned
|
||||||
if self.random_start:
|
if self.random_start:
|
||||||
|
|||||||
@@ -142,6 +142,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
producer.stop()
|
producer.stop()
|
||||||
|
|
||||||
|
@kafka_versions("all")
|
||||||
|
def test_produce__new_topic_fails_with_reasonable_error(self):
|
||||||
|
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4()))
|
||||||
|
producer = SimpleProducer(self.client)
|
||||||
|
|
||||||
|
# At first it doesn't exist
|
||||||
|
with self.assertRaises(UnknownTopicOrPartitionError):
|
||||||
|
resp = producer.send_messages(new_topic, self.msg("one"))
|
||||||
|
|
||||||
@kafka_versions("all")
|
@kafka_versions("all")
|
||||||
def test_producer_random_order(self):
|
def test_producer_random_order(self):
|
||||||
producer = SimpleProducer(self.client, random_start = True)
|
producer = SimpleProducer(self.client, random_start = True)
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ from kafka import KafkaClient
|
|||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'random_string',
|
'random_string',
|
||||||
'ensure_topic_creation',
|
|
||||||
'get_open_port',
|
'get_open_port',
|
||||||
'kafka_versions',
|
'kafka_versions',
|
||||||
'KafkaIntegrationTestCase',
|
'KafkaIntegrationTestCase',
|
||||||
@@ -39,16 +38,6 @@ def kafka_versions(*versions):
|
|||||||
return wrapper
|
return wrapper
|
||||||
return kafka_versions
|
return kafka_versions
|
||||||
|
|
||||||
def ensure_topic_creation(client, topic_name, timeout = 30):
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
client.load_metadata_for_topics(topic_name)
|
|
||||||
while not client.has_metadata_for_topic(topic_name):
|
|
||||||
if time.time() > start_time + timeout:
|
|
||||||
raise Exception("Unable to create topic %s" % topic_name)
|
|
||||||
client.load_metadata_for_topics(topic_name)
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
def get_open_port():
|
def get_open_port():
|
||||||
sock = socket.socket()
|
sock = socket.socket()
|
||||||
sock.bind(("", 0))
|
sock.bind(("", 0))
|
||||||
@@ -71,7 +60,7 @@ class KafkaIntegrationTestCase(unittest2.TestCase):
|
|||||||
if self.create_client:
|
if self.create_client:
|
||||||
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
|
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
|
||||||
|
|
||||||
ensure_topic_creation(self.client, self.topic)
|
self.client.ensure_topic_exists(self.topic)
|
||||||
|
|
||||||
self._messages = {}
|
self._messages = {}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user