Merge pull request #223 from dpkp/metadata_refactor

Metadata Refactor

* add MetadataRequest and MetadataResponse namedtuples
* add TopicMetadata namedtuple
* add error codes to Topic and Partition Metadata
* add KafkaClient.send_metadata_request() method
* KafkaProtocol.decode_metadata_response changed to return a
  MetadataResponse object so that it is consistent with server api:
  [broker_list, topic_list]
* raise server exceptions in load_metadata_for_topics(*topics)
  unless topics is null (full refresh)
* Replace non-standard exceptions (LeaderUnavailable,
  PartitionUnavailable) with server standard exceptions
  (LeaderNotAvailableError, UnknownTopicOrPartitionError)

Conflicts:
	kafka/client.py
	test/test_client.py
	test/test_producer_integration.py
	test/test_protocol.py
This commit is contained in:
Dana Powers
2014-09-08 13:15:56 -07:00
8 changed files with 493 additions and 251 deletions

View File

@@ -7,11 +7,11 @@ import logging
import time
import kafka.common
from kafka.common import (TopicAndPartition,
from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError, LeaderUnavailableError, KafkaUnavailableError,
KafkaTimeoutError,
UnknownTopicOrPartitionError, NotLeaderForPartitionError)
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -37,8 +37,9 @@ class KafkaClient(object):
# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata
self.topic_partitions = {} # topic -> partition -> PartitionMetadata
self.load_metadata_for_topics() # bootstrap with all metadata
@@ -63,20 +64,37 @@ class KafkaClient(object):
Returns the leader for a partition or None if the partition exists
but has no leader.
PartitionUnavailableError will be raised if the topic or partition
UnknownTopicOrPartitionError will be raised if the topic or partition
is not part of the metadata.
LeaderNotAvailableError is raised if server has metadata, but there is
no current leader
"""
key = TopicAndPartition(topic, partition)
# reload metadata whether the partition is not available
# or has no leader (broker is None)
if self.topics_to_brokers.get(key) is None:
self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
raise PartitionUnavailableError("%s not available" % str(key))
# Use cached metadata if it is there
if self.topics_to_brokers.get(key) is not None:
return self.topics_to_brokers[key]
return self.topics_to_brokers[key]
# Otherwise refresh metadata
# If topic does not already exist, this will raise
# UnknownTopicOrPartitionError if not auto-creating
# LeaderNotAvailableError otherwise until partitions are created
self.load_metadata_for_topics(topic)
# If the partition doesn't actually exist, raise
if partition not in self.topic_partitions[topic]:
raise UnknownTopicOrPartitionError(key)
# If there's no leader for the partition, raise
meta = self.topic_partitions[topic][partition]
if meta.leader == -1:
raise LeaderNotAvailableError(meta)
# Otherwise return the BrokerMetadata
return self.brokers[meta.leader]
def _next_id(self):
"""
@@ -84,20 +102,26 @@ class KafkaClient(object):
"""
return next(KafkaClient.ID_GEN)
def _send_broker_unaware_request(self, requestId, request):
def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
"""
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for (host, port) in self.hosts:
requestId = self._next_id()
try:
conn = self._get_conn(host, port)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId,
payloads=payloads)
conn.send(requestId, request)
response = conn.recv(requestId)
return response
return decoder_fn(response)
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (binascii.b2a_hex(request), host, port, e))
"trying next server: %s" % (requestId, host, port, e))
raise KafkaUnavailableError("All servers failed to process request")
@@ -109,8 +133,8 @@ class KafkaClient(object):
Params
======
payloads: list of object-like entities with a topic and
partition attribute
payloads: list of object-like entities with a topic (str) and
partition (int) attribute
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
keyword arguments
@@ -130,10 +154,6 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
if leader is None:
raise LeaderUnavailableError(
"Leader not available for topic %s partition %s" %
(payload.topic, payload.partition))
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
@@ -195,35 +215,6 @@ class KafkaClient(object):
#################
# Public API #
#################
def reset_topic_metadata(self, *topics):
for topic in topics:
try:
partitions = self.topic_partitions[topic]
except KeyError:
continue
for partition in partitions:
self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None)
del self.topic_partitions[topic]
def reset_all_metadata(self):
self.topics_to_brokers.clear()
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
return topic in self.topic_partitions
def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()
self.load_metadata_for_topics(topic)
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
self.load_metadata_for_topics(topic)
time.sleep(.5)
def close(self):
for conn in self.conns.values():
conn.close()
@@ -242,40 +233,142 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.reinit()
def load_metadata_for_topics(self, *topics):
"""
Discover brokers and metadata for a set of topics. This function is called
lazily whenever metadata is unavailable.
"""
request_id = self._next_id()
request = KafkaProtocol.encode_metadata_request(self.client_id,
request_id, topics)
response = self._send_broker_unaware_request(request_id, request)
(brokers, topics) = KafkaProtocol.decode_metadata_response(response)
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
self.brokers = brokers
for topic, partitions in topics.items():
self.reset_topic_metadata(topic)
if not partitions:
log.warning('No partitions for %s', topic)
def reset_topic_metadata(self, *topics):
for topic in topics:
try:
partitions = self.topic_partitions[topic]
except KeyError:
continue
self.topic_partitions[topic] = []
for partition, meta in partitions.items():
self.topic_partitions[topic].append(partition)
for partition in partitions:
self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None)
del self.topic_partitions[topic]
def reset_all_metadata(self):
self.topics_to_brokers.clear()
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
return (
topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0
)
def get_partition_ids_for_topic(self, topic):
if topic not in self.topic_partitions:
return None
return list(self.topic_partitions[topic])
def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
try:
self.load_metadata_for_topics(topic)
except LeaderNotAvailableError:
pass
except UnknownTopicOrPartitionError:
# Server is not configured to auto-create
# retrying in this case will not help
raise
time.sleep(.5)
def load_metadata_for_topics(self, *topics):
"""
Fetch broker and topic-partition metadata from the server,
and update internal data:
broker list, topic/partition list, and topic/parition -> broker map
This method should be called after receiving any error
@param: *topics (optional)
If a list of topics is provided, the metadata refresh will be limited
to the specified topics only.
Exceptions:
----------
If the broker is configured to not auto-create topics,
expect UnknownTopicOrPartitionError for topics that don't exist
If the broker is configured to auto-create topics,
expect LeaderNotAvailableError for new topics
until partitions have been initialized.
Exceptions *will not* be raised in a full refresh (i.e. no topic list)
In this case, error codes will be logged as errors
Partition-level errors will also not be raised here
(a single partition w/o a leader, for example)
"""
resp = self.send_metadata_request(topics)
log.debug("Broker metadata: %s", resp.brokers)
log.debug("Topic metadata: %s", resp.topics)
self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
for topic_metadata in resp.topics:
topic = topic_metadata.topic
partitions = topic_metadata.partitions
self.reset_topic_metadata(topic)
# Errors expected for new topics
try:
kafka.common.check_error(topic_metadata)
except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
# Raise if the topic was passed in explicitly
if topic in topics:
raise
# Otherwise, just log a warning
log.error("Error loading topic metadata for %s: %s", topic, type(e))
continue
self.topic_partitions[topic] = {}
for partition_metadata in partitions:
partition = partition_metadata.partition
leader = partition_metadata.leader
self.topic_partitions[topic][partition] = partition_metadata
# Populate topics_to_brokers dict
topic_part = TopicAndPartition(topic, partition)
if meta.leader == -1:
log.warning('No leader for topic %s partition %s', topic, partition)
# Check for partition errors
try:
kafka.common.check_error(partition_metadata)
# If No Leader, topics_to_brokers topic_partition -> None
except LeaderNotAvailableError:
log.error('No leader for topic %s partition %d', topic, partition)
self.topics_to_brokers[topic_part] = None
continue
# If Known Broker, topic_partition -> BrokerMetadata
if leader in self.brokers:
self.topics_to_brokers[topic_part] = self.brokers[leader]
# If Unknown Broker, fake BrokerMetadata so we dont lose the id
# (not sure how this could happen. server could be in bad state)
else:
self.topics_to_brokers[topic_part] = brokers[meta.leader]
self.topics_to_brokers[topic_part] = BrokerMetadata(
leader, None, None
)
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response
return self._send_broker_unaware_request(payloads, encoder, decoder)
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):

View File

@@ -17,8 +17,14 @@ OffsetRequest = namedtuple("OffsetRequest",
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
["topic", "partition", "offset", "metadata"])
MetadataRequest = namedtuple("MetadataRequest",
["topics"])
OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
# Response payloads
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
@@ -36,16 +42,26 @@ OffsetFetchResponse = namedtuple("OffsetFetchResponse",
["topic", "partition", "offset",
"metadata", "error"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader",
"replicas", "isr"])
# Other useful structs
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])
Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
BrokerMetadata = namedtuple("BrokerMetadata",
["nodeId", "host", "port"])
TopicMetadata = namedtuple("TopicMetadata",
["topic", "error", "partitions"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
OffsetAndMessage = namedtuple("OffsetAndMessage",
["offset", "message"])
Message = namedtuple("Message",
["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition",
["topic", "partition"])
#################
@@ -60,6 +76,9 @@ class KafkaError(RuntimeError):
class BrokerResponseError(KafkaError):
pass
class NoError(BrokerResponseError):
errno = 0
message = 'SUCCESS'
class UnknownError(BrokerResponseError):
errno = -1
@@ -139,14 +158,6 @@ class KafkaTimeoutError(KafkaError):
pass
class LeaderUnavailableError(KafkaError):
pass
class PartitionUnavailableError(KafkaError):
pass
class FailedPayloadsError(KafkaError):
pass
@@ -181,6 +192,7 @@ class UnsupportedCodecError(KafkaError):
kafka_errors = {
-1 : UnknownError,
0 : NoError,
1 : OffsetOutOfRangeError,
2 : InvalidMessageError,
3 : UnknownTopicOrPartitionError,
@@ -198,7 +210,7 @@ kafka_errors = {
def check_error(response):
error = kafka_errors.get(response.error)
if error:
error = kafka_errors.get(response.error, UnknownError)
if error is not NoError:
raise error(response)

View File

@@ -91,7 +91,7 @@ class Consumer(object):
self.offsets = {}
if not partitions:
partitions = self.client.topic_partitions[topic]
partitions = self.client.get_partition_ids_for_topic(topic)
else:
assert all(isinstance(x, numbers.Integral) for x in partitions)
@@ -117,9 +117,9 @@ class Consumer(object):
def fetch_last_known_offsets(self, partitions=None):
if not partitions:
partitions = self.client.topic_partitions[self.topic]
partitions = self.client.get_partition_ids_for_topic(self.topic)
def get_or_init_offset_callback(resp):
def get_or_init_offset(resp):
try:
kafka.common.check_error(resp)
return resp.offset
@@ -128,10 +128,9 @@ class Consumer(object):
for partition in partitions:
req = OffsetFetchRequest(self.topic, partition)
(offset,) = self.client.send_offset_fetch_request(self.group, [req],
callback=get_or_init_offset_callback,
(resp,) = self.client.send_offset_fetch_request(self.group, [req],
fail_on_error=False)
self.offsets[partition] = offset
self.offsets[partition] = get_or_init_offset(resp)
self.fetch_offsets = self.offsets.copy()
def commit(self, partitions=None):

View File

@@ -247,16 +247,14 @@ class SimpleProducer(Producer):
def _next_partition(self, topic):
if topic not in self.partition_cycles:
if topic not in self.client.topic_partitions:
if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
try:
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
except KeyError:
raise UnknownTopicOrPartitionError(topic)
self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
# Randomize the initial partition that is returned
if self.random_start:
num_partitions = len(self.client.topic_partitions[topic])
num_partitions = len(self.client.get_partition_ids_for_topic(topic))
for _ in xrange(random.randint(0, num_partitions-1)):
next(self.partition_cycles[topic])
@@ -305,12 +303,13 @@ class KeyedProducer(Producer):
def _next_partition(self, topic, key):
if topic not in self.partitioners:
if topic not in self.client.topic_partitions:
if not self.client.has_metadata_for_topic(topic):
self.client.load_metadata_for_topics(topic)
self.partitioners[topic] = \
self.partitioner_class(self.client.topic_partitions[topic])
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
partitioner = self.partitioners[topic]
return partitioner.partition(key, self.client.topic_partitions[topic])
return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))
def send(self, topic, key, msg):
partition = self._next_partition(topic, key)

View File

@@ -9,11 +9,12 @@ from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from kafka.common import (
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
ProduceResponse, FetchResponse, OffsetResponse,
OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall,
UnsupportedCodecError
Message, OffsetAndMessage, TopicAndPartition,
BrokerMetadata, TopicMetadata, PartitionMetadata,
MetadataResponse, ProduceResponse, FetchResponse,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProtocolError, BufferUnderflowError, ChecksumError,
ConsumerFetchSizeTooSmall, UnsupportedCodecError
)
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
@@ -343,7 +344,8 @@ class KafkaProtocol(object):
yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=None):
def encode_metadata_request(cls, client_id, correlation_id, topics=None,
payloads=None):
"""
Encode a MetadataRequest
@@ -353,7 +355,11 @@ class KafkaProtocol(object):
correlation_id: int
topics: list of strings
"""
topics = [] if topics is None else topics
if payloads is None:
topics = [] if topics is None else topics
else:
topics = payloads
message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.METADATA_KEY)
@@ -376,28 +382,24 @@ class KafkaProtocol(object):
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
# Broker info
brokers = {}
brokers = []
for i in range(numbrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
brokers[nodeId] = BrokerMetadata(nodeId, host, port)
brokers.append(BrokerMetadata(nodeId, host, port))
# Topic info
((num_topics,), cur) = relative_unpack('>i', data, cur)
topic_metadata = {}
topic_metadata = []
for i in range(num_topics):
# NOTE: topic_error is discarded. Should probably be returned with
# the topic metadata.
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = {}
partition_metadata = []
for j in range(num_partitions):
# NOTE: partition_error_code is discarded. Should probably be
# returned with the partition metadata.
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
@@ -407,13 +409,16 @@ class KafkaProtocol(object):
((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
partition_metadata[partition] = \
PartitionMetadata(
topic_name, partition, leader, replicas, isr)
partition_metadata.append(
PartitionMetadata(topic_name, partition, leader,
replicas, isr, partition_error_code)
)
topic_metadata[topic_name] = partition_metadata
topic_metadata.append(
TopicMetadata(topic_name, topic_error, partition_metadata)
)
return brokers, topic_metadata
return MetadataResponse(brokers, topic_metadata)
@classmethod
def encode_offset_commit_request(cls, client_id, correlation_id,

View File

@@ -1,15 +1,17 @@
import socket
from time import sleep
from mock import MagicMock, patch
from mock import ANY, MagicMock, patch
import six
from . import unittest
from kafka import KafkaClient
from kafka.common import (
ProduceRequest, BrokerMetadata, PartitionMetadata,
ProduceRequest, MetadataResponse,
BrokerMetadata, TopicMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
LeaderUnavailableError, PartitionUnavailableError,
LeaderNotAvailableError, NoError,
UnknownTopicOrPartitionError, KafkaTimeoutError,
ConnectionError
)
from kafka.conn import KafkaConnection
@@ -17,6 +19,10 @@ from kafka.protocol import KafkaProtocol, create_message
from test.testutil import Timer
NO_ERROR = 0
UNKNOWN_TOPIC_OR_PARTITION = 3
NO_LEADER = 5
class TestKafkaClient(unittest.TestCase):
def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
@@ -64,10 +70,12 @@ class TestKafkaClient(unittest.TestCase):
req = KafkaProtocol.encode_metadata_request(b'client', 0)
with self.assertRaises(KafkaUnavailableError):
client._send_broker_unaware_request(1, req)
client._send_broker_unaware_request(payloads=['fake request'],
encoder_fn=MagicMock(return_value='fake encoded message'),
decoder_fn=lambda x: x)
for key, conn in six.iteritems(mocked_conns):
conn.send.assert_called_with(1, req)
conn.send.assert_called_with(ANY, 'fake encoded message')
def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available'
@@ -88,40 +96,46 @@ class TestKafkaClient(unittest.TestCase):
# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'):
with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
with patch.object(KafkaClient, '_next_id', return_value=1):
client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
req = KafkaProtocol.encode_metadata_request(b'client', 0)
resp = client._send_broker_unaware_request(1, req)
resp = client._send_broker_unaware_request(payloads=['fake request'],
encoder_fn=MagicMock(),
decoder_fn=lambda x: x)
self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_load_metadata(self, protocol, conn):
"Load metadata for all topics"
conn.recv.return_value = 'response' # anything but None
brokers = {}
brokers[0] = BrokerMetadata(1, 'broker_1', 4567)
brokers[1] = BrokerMetadata(2, 'broker_2', 5678)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
]
topics = {}
topics['topic_1'] = {
0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2])
}
topics['topic_noleader'] = {
0: PartitionMetadata('topic_noleader', 0, -1, [], []),
1: PartitionMetadata('topic_noleader', 1, -1, [], [])
}
topics['topic_no_partitions'] = {}
topics['topic_3'] = {
0: PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1]),
1: PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0]),
2: PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
topics = [
TopicMetadata('topic_1', NO_ERROR, [
PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
]),
TopicMetadata('topic_noleader', NO_ERROR, [
PartitionMetadata('topic_noleader', 0, -1, [], [],
NO_LEADER),
PartitionMetadata('topic_noleader', 1, -1, [], [],
NO_LEADER),
]),
TopicMetadata('topic_no_partitions', NO_LEADER, []),
TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_3', NO_ERROR, [
PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
# client loads metadata at init
client = KafkaClient(hosts=['broker_1:4567'])
@@ -134,6 +148,78 @@ class TestKafkaClient(unittest.TestCase):
TopicAndPartition('topic_3', 2): brokers[0]},
client.topics_to_brokers)
# if we ask for metadata explicitly, it should raise errors
with self.assertRaises(LeaderNotAvailableError):
client.load_metadata_for_topics('topic_no_partitions')
with self.assertRaises(UnknownTopicOrPartitionError):
client.load_metadata_for_topics('topic_unknown')
# This should not raise
client.load_metadata_for_topics('topic_no_leader')
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_has_metadata_for_topic(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
]
topics = [
TopicMetadata('topic_still_creating', NO_LEADER, []),
TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_noleaders', NO_ERROR, [
PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
# Topics with no partitions return False
self.assertFalse(client.has_metadata_for_topic('topic_still_creating'))
self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist'))
# Topic with partition metadata, but no leaders return True
self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_ensure_topic_exists(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
]
topics = [
TopicMetadata('topic_still_creating', NO_LEADER, []),
TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_noleaders', NO_ERROR, [
PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
with self.assertRaises(UnknownTopicOrPartitionError):
client.ensure_topic_exists('topic_doesnt_exist', timeout=1)
with self.assertRaises(KafkaTimeoutError):
client.ensure_topic_exists('topic_still_creating', timeout=1)
# This should not raise
client.ensure_topic_exists('topic_noleaders', timeout=1)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
@@ -141,70 +227,84 @@ class TestKafkaClient(unittest.TestCase):
conn.recv.return_value = 'response' # anything but None
brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
]
topics = {'topic_no_partitions': {}}
protocol.decode_metadata_response.return_value = (brokers, topics)
topics = [
TopicMetadata('topic_no_partitions', NO_LEADER, [])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
# topic metadata is loaded but empty
self.assertDictEqual({}, client.topics_to_brokers)
topics['topic_no_partitions'] = {
0: PartitionMetadata('topic_no_partitions', 0, 0, [0, 1], [0, 1])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
topics = [
TopicMetadata('topic_one_partition', NO_ERROR, [
PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR)
])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
# calling _get_leader_for_partition (from any broker aware request)
# will try loading metadata again for the same topic
leader = client._get_leader_for_partition('topic_no_partitions', 0)
leader = client._get_leader_for_partition('topic_one_partition', 0)
self.assertEqual(brokers[0], leader)
self.assertDictEqual({
TopicAndPartition('topic_no_partitions', 0): brokers[0]},
TopicAndPartition('topic_one_partition', 0): brokers[0]},
client.topics_to_brokers)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_unassigned_partitions(self, protocol, conn):
"Get leader raises if no partitions is defined for a topic"
conn.recv.return_value = 'response' # anything but None
brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
]
topics = {'topic_no_partitions': {}}
protocol.decode_metadata_response.return_value = (brokers, topics)
topics = [
TopicMetadata('topic_no_partitions', NO_LEADER, []),
TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({}, client.topics_to_brokers)
with self.assertRaises(PartitionUnavailableError):
with self.assertRaises(LeaderNotAvailableError):
client._get_leader_for_partition('topic_no_partitions', 0)
with self.assertRaises(UnknownTopicOrPartitionError):
client._get_leader_for_partition('topic_unknown', 0)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_returns_none_when_noleader(self, protocol, conn):
"Getting leader for partitions returns None when the partiion has no leader"
def test_get_leader_exceptions_when_noleader(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
]
topics = {}
topics['topic_noleader'] = {
0: PartitionMetadata('topic_noleader', 0, -1, [], []),
1: PartitionMetadata('topic_noleader', 1, -1, [], [])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
topics = [
TopicMetadata('topic_noleader', NO_ERROR, [
PartitionMetadata('topic_noleader', 0, -1, [], [],
NO_LEADER),
PartitionMetadata('topic_noleader', 1, -1, [], [],
NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual(
@@ -213,34 +313,48 @@ class TestKafkaClient(unittest.TestCase):
TopicAndPartition('topic_noleader', 1): None
},
client.topics_to_brokers)
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
topics['topic_noleader'] = {
0: PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1]),
1: PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
# No leader partitions -- raise LeaderNotAvailableError
with self.assertRaises(LeaderNotAvailableError):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
with self.assertRaises(LeaderNotAvailableError):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
# Unknown partitions -- raise UnknownTopicOrPartitionError
with self.assertRaises(UnknownTopicOrPartitionError):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2))
topics = [
TopicMetadata('topic_noleader', NO_ERROR, [
PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR),
PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR)
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_send_produce_request_raises_when_noleader(self, protocol, conn):
"Send producer request raises LeaderUnavailableError if leader is not available"
"Send producer request raises LeaderNotAvailableError if leader is not available"
conn.recv.return_value = 'response' # anything but None
brokers = {}
brokers[0] = BrokerMetadata(0, 'broker_1', 4567)
brokers[1] = BrokerMetadata(1, 'broker_2', 5678)
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
]
topics = {}
topics['topic_noleader'] = {
0: PartitionMetadata('topic_noleader', 0, -1, [], []),
1: PartitionMetadata('topic_noleader', 1, -1, [], [])
}
protocol.decode_metadata_response.return_value = (brokers, topics)
topics = [
TopicMetadata('topic_noleader', NO_ERROR, [
PartitionMetadata('topic_noleader', 0, -1, [], [],
NO_LEADER),
PartitionMetadata('topic_noleader', 1, -1, [], [],
NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
@@ -248,7 +362,32 @@ class TestKafkaClient(unittest.TestCase):
"topic_noleader", 0,
[create_message("a"), create_message("b")])]
with self.assertRaises(LeaderUnavailableError):
with self.assertRaises(LeaderNotAvailableError):
client.send_produce_request(requests)
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
conn.recv.return_value = 'response' # anything but None
brokers = [
BrokerMetadata(0, 'broker_1', 4567),
BrokerMetadata(1, 'broker_2', 5678)
]
topics = [
TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
client = KafkaClient(hosts=['broker_1:4567'])
requests = [ProduceRequest(
"topic_doesnt_exist", 0,
[create_message("a"), create_message("b")])]
with self.assertRaises(UnknownTopicOrPartitionError):
client.send_produce_request(requests)
def test_timeout(self):

View File

@@ -9,7 +9,8 @@ from kafka import (
)
from kafka.codec import has_snappy
from kafka.common import (
FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
FetchRequest, ProduceRequest,
UnknownTopicOrPartitionError, LeaderNotAvailableError
)
from test.fixtures import ZookeeperFixture, KafkaFixture
@@ -165,7 +166,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(self.client)
# At first it doesn't exist
with self.assertRaises(UnknownTopicOrPartitionError):
with self.assertRaises((UnknownTopicOrPartitionError,
LeaderNotAvailableError)):
producer.send_messages(new_topic, self.msg("one"))
@kafka_versions("all")

View File

@@ -10,9 +10,10 @@ from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, PartitionMetadata, ProtocolError,
UnsupportedCodecError
ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
ProtocolError
)
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
@@ -451,21 +452,22 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(encoded, expected)
def _create_encoded_metadata_response(self, broker_data, topic_data,
topic_errors, partition_errors):
encoded = struct.pack('>ii', 3, len(broker_data))
for node_id, broker in six.iteritems(broker_data):
encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
def _create_encoded_metadata_response(self, brokers, topics):
encoded = struct.pack('>ii', 3, len(brokers))
for broker in brokers:
encoded += struct.pack('>ih%dsi' % len(broker.host), broker.nodeId,
len(broker.host), broker.host, broker.port)
encoded += struct.pack('>i', len(topic_data))
for topic, partitions in six.iteritems(topic_data):
encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic],
len(topic), topic, len(partitions))
for partition, metadata in six.iteritems(partitions):
encoded += struct.pack('>i', len(topics))
for topic in topics:
encoded += struct.pack('>hh%dsi' % len(topic.topic),
topic.error, len(topic.topic),
topic.topic, len(topic.partitions))
for metadata in topic.partitions:
encoded += struct.pack('>hiii',
partition_errors[(topic, partition)],
partition, metadata.leader,
metadata.error,
metadata.partition,
metadata.leader,
len(metadata.replicas))
if len(metadata.replicas) > 0:
encoded += struct.pack('>%di' % len(metadata.replicas),
@@ -475,35 +477,26 @@ class TestProtocol(unittest.TestCase):
if len(metadata.isr) > 0:
encoded += struct.pack('>%di' % len(metadata.isr),
*metadata.isr)
return encoded
def test_decode_metadata_response(self):
node_brokers = {
0: BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
1: BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
3: BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
}
node_brokers = [
BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
]
topic_partitions = {
b"topic1": {
0: PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,)),
1: PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1))
},
b"topic2": {
0: PartitionMetadata(b"topic2", 0, 0, (), ())
}
}
topic_errors = {b"topic1": 0, b"topic2": 1}
partition_errors = {
(b"topic1", 0): 0,
(b"topic1", 1): 1,
(b"topic2", 0): 0
}
topic_partitions = [
TopicMetadata(b"topic1", 0, [
PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1)
]),
TopicMetadata(b"topic2", 1, [
PartitionMetadata(b"topic2", 0, 0, (), (), 0),
]),
]
encoded = self._create_encoded_metadata_response(node_brokers,
topic_partitions,
topic_errors,
partition_errors)
topic_partitions)
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))