Merge pull request #234 from dpkp/high_level_consumer

A simpler kafka consumer
This commit is contained in:
Mark Roberts
2014-12-16 20:49:13 -08:00
14 changed files with 941 additions and 68 deletions

View File

@@ -13,7 +13,7 @@ from kafka.protocol import (
) )
from kafka.producer import SimpleProducer, KeyedProducer from kafka.producer import SimpleProducer, KeyedProducer
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
from kafka.consumer import SimpleConsumer, MultiProcessConsumer from kafka.consumer import SimpleConsumer, MultiProcessConsumer, KafkaConsumer
__all__ = [ __all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',

View File

@@ -119,9 +119,9 @@ class KafkaClient(object):
response = conn.recv(requestId) response = conn.recv(requestId)
return decoder_fn(response) return decoder_fn(response)
except Exception as e: except Exception:
log.warning("Could not send request [%r] to server %s:%i, " log.exception("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (requestId, host, port, e)) "trying next server" % (requestId, host, port))
raise KafkaUnavailableError("All servers failed to process request") raise KafkaUnavailableError("All servers failed to process request")

View File

@@ -63,6 +63,9 @@ Message = namedtuple("Message",
TopicAndPartition = namedtuple("TopicAndPartition", TopicAndPartition = namedtuple("TopicAndPartition",
["topic", "partition"]) ["topic", "partition"])
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
################# #################
# Exceptions # # Exceptions #
@@ -182,6 +185,10 @@ class ConsumerNoMoreData(KafkaError):
pass pass
class ConsumerTimeout(KafkaError):
pass
class ProtocolError(KafkaError): class ProtocolError(KafkaError):
pass pass
@@ -190,6 +197,10 @@ class UnsupportedCodecError(KafkaError):
pass pass
class KafkaConfigurationError(KafkaError):
pass
kafka_errors = { kafka_errors = {
-1 : UnknownError, -1 : UnknownError,
0 : NoError, 0 : NoError,

View File

@@ -1,6 +1,7 @@
from .simple import SimpleConsumer from .simple import SimpleConsumer
from .multiprocess import MultiProcessConsumer from .multiprocess import MultiProcessConsumer
from .kafka import KafkaConsumer
__all__ = [ __all__ = [
'SimpleConsumer', 'MultiProcessConsumer' 'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer'
] ]

735
kafka/consumer/kafka.py Normal file
View File

@@ -0,0 +1,735 @@
from __future__ import absolute_import
from collections import namedtuple
from copy import deepcopy
import logging
import random
import sys
import time
import six
from kafka.client import KafkaClient
from kafka.common import (
OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
from kafka.util import kafka_bytestring
logger = logging.getLogger(__name__)
OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"])
DEFAULT_CONSUMER_CONFIG = {
'client_id': __name__,
'group_id': None,
'metadata_broker_list': None,
'socket_timeout_ms': 30 * 1000,
'fetch_message_max_bytes': 1024 * 1024,
'auto_offset_reset': 'largest',
'fetch_min_bytes': 1,
'fetch_wait_max_ms': 100,
'refresh_leader_backoff_ms': 200,
'deserializer_class': lambda msg: msg,
'auto_commit_enable': False,
'auto_commit_interval_ms': 60 * 1000,
'auto_commit_interval_messages': None,
'consumer_timeout_ms': -1,
# Currently unused
'socket_receive_buffer_bytes': 64 * 1024,
'num_consumer_fetchers': 1,
'default_fetcher_backoff_ms': 1000,
'queued_max_message_chunks': 10,
'rebalance_max_retries': 4,
'rebalance_backoff_ms': 2000,
}
BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')
class KafkaConsumer(object):
"""
A simpler kafka consumer
```
# A very basic 'tail' consumer, with no stored offset management
kafka = KafkaConsumer('topic1')
for m in kafka:
print m
# Alternate interface: next()
print kafka.next()
# Alternate interface: batch iteration
while True:
for m in kafka.fetch_messages():
print m
print "Done with batch - let's do another!"
```
```
# more advanced consumer -- multiple topics w/ auto commit offset management
kafka = KafkaConsumer('topic1', 'topic2',
group_id='my_consumer_group',
auto_commit_enable=True,
auto_commit_interval_ms=30 * 1000,
auto_offset_reset='smallest')
# Infinite iteration
for m in kafka:
process_message(m)
kafka.task_done(m)
# Alternate interface: next()
m = kafka.next()
process_message(m)
kafka.task_done(m)
# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()
# Batch process interface
while True:
for m in kafka.fetch_messages():
process_message(m)
kafka.task_done(m)
```
messages (m) are namedtuples with attributes:
m.topic: topic name (str)
m.partition: partition number (int)
m.offset: message offset on topic-partition log (int)
m.key: key (bytes - can be None)
m.value: message (output of deserializer_class - default is raw bytes)
Configuration settings can be passed to constructor,
otherwise defaults will be used:
client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
def __init__(self, *topics, **configs):
self.configure(**configs)
self.set_topic_partitions(*topics)
def configure(self, **configs):
"""
Configuration settings can be passed to constructor,
otherwise defaults will be used:
client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
auto_commit_interval_messages=None,
consumer_timeout_ms=-1
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
self._config = {}
for key in DEFAULT_CONSUMER_CONFIG:
self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key])
if configs:
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))
# Handle str/bytes conversions
for config_key in BYTES_CONFIGURATION_KEYS:
if isinstance(self._config[config_key], six.string_types):
logger.warning("Converting configuration key '%s' to bytes" %
config_key)
self._config[config_key] = self._config[config_key].encode('utf-8')
if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
# Check auto-commit configuration
if self._config['auto_commit_enable']:
logger.info("Configuring consumer to auto-commit offsets")
self._reset_auto_commit()
if self._config['metadata_broker_list'] is None:
raise KafkaConfigurationError('metadata_broker_list required to '
'configure KafkaConsumer')
self._client = KafkaClient(self._config['metadata_broker_list'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0))
def set_topic_partitions(self, *topics):
"""
Set the topic/partitions to consume
Optionally specify offsets to start from
Accepts types:
str (utf-8): topic name (will consume all available partitions)
tuple: (topic, partition)
dict: { topic: partition }
{ topic: [partition list] }
{ topic: (partition tuple,) }
Optionally, offsets can be specified directly:
tuple: (topic, partition, offset)
dict: { (topic, partition): offset, ... }
Ex:
kafka = KafkaConsumer()
# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
"""
self._topics = []
self._client.load_metadata_for_topics()
# Setup offsets
self._offsets = OffsetsStruct(fetch=dict(),
commit=dict(),
highwater=dict(),
task_done=dict())
# Handle different topic types
for arg in topics:
# Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)):
topic = kafka_bytestring(arg)
for partition in self._client.get_partition_ids_for_topic(topic):
self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
topic = kafka_bytestring(arg[0])
partition = arg[1]
if len(arg) == 3:
offset = arg[2]
self._offsets.fetch[(topic, partition)] = offset
self._consume_topic_partition(topic, partition)
# { topic: partitions, ... } dict
elif isinstance(arg, dict):
for key, value in six.iteritems(arg):
# key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)):
topic = kafka_bytestring(key)
# topic: partition
if isinstance(value, int):
self._consume_topic_partition(topic, value)
# topic: [ partition1, partition2, ... ]
elif isinstance(value, (list, tuple)):
for partition in value:
self._consume_topic_partition(topic, partition)
else:
raise KafkaConfigurationError('Unknown topic type (dict key must be '
'int or list/tuple of ints)')
# (topic, partition): offset
elif isinstance(key, tuple):
topic = kafka_bytestring(key[0])
partition = key[1]
self._consume_topic_partition(topic, partition)
self._offsets.fetch[key] = value
else:
raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
# If we have a consumer group, try to fetch stored offsets
if self._config['group_id']:
self._get_commit_offsets()
# Update missing fetch/commit offsets
for topic_partition in self._topics:
# Commit offsets default is None
if topic_partition not in self._offsets.commit:
self._offsets.commit[topic_partition] = None
# Skip if we already have a fetch offset from user args
if topic_partition not in self._offsets.fetch:
# Fetch offsets default is (1) commit
if self._offsets.commit[topic_partition] is not None:
self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
# or (2) auto reset
else:
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
# highwater marks (received from server on fetch response)
# and task_done (set locally by user)
# should always get initialized to None
self._reset_highwater_offsets()
self._reset_task_done_offsets()
# Reset message iterator in case we were in the middle of one
self._reset_message_iterator()
def next(self):
"""
Return a single message from the message iterator
If consumer_timeout_ms is set, will raise ConsumerTimeout
if no message is available
Otherwise blocks indefinitely
Note that this is also the method called internally during iteration:
```
for m in consumer:
pass
```
"""
self._set_consumer_timeout_start()
while True:
try:
return six.next(self._get_message_iterator())
# Handle batch completion
except StopIteration:
self._reset_message_iterator()
self._check_consumer_timeout()
def fetch_messages(self):
"""
Sends FetchRequests for all topic/partitions set for consumption
Returns a generator that yields KafkaMessage structs
after deserializing with the configured `deserializer_class`
Refreshes metadata on errors, and resets fetch offset on
OffsetOutOfRange, per the configured `auto_offset_reset` policy
Key configuration parameters:
`fetch_message_max_bytes`
`fetch_max_wait_ms`
`fetch_min_bytes`
`deserializer_class`
`auto_offset_reset`
"""
max_bytes = self._config['fetch_message_max_bytes']
max_wait_time = self._config['fetch_wait_max_ms']
min_bytes = self._config['fetch_min_bytes']
# Get current fetch offsets
offsets = self._offsets.fetch
if not offsets:
if not self._topics:
raise KafkaConfigurationError('No topics or partitions configured')
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
fetches = []
for topic_partition, offset in six.iteritems(offsets):
fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
# client.send_fetch_request will collect topic/partition requests by leader
# and send each group as a single FetchRequest to the correct broker
try:
responses = self._client.send_fetch_request(fetches,
max_wait_time=max_wait_time,
min_bytes=min_bytes,
fail_on_error=False)
except FailedPayloadsError:
logger.warning('FailedPayloadsError attempting to fetch data from kafka')
self._refresh_metadata_on_error()
return
for resp in responses:
topic_partition = (resp.topic, resp.partition)
try:
check_error(resp)
except OffsetOutOfRangeError:
logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
'(Highwatermark: %d)',
resp.topic, resp.partition,
offsets[topic_partition], resp.highwaterMark)
# Reset offset
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
continue
except NotLeaderForPartitionError:
logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
resp.topic, resp.partition)
self._refresh_metadata_on_error()
continue
except RequestTimedOutError:
logger.warning("RequestTimedOutError for %s - %d",
resp.topic, resp.partition)
continue
# Track server highwater mark
self._offsets.highwater[topic_partition] = resp.highwaterMark
# Yield each message
# Kafka-python could raise an exception during iteration
# we are not catching -- user will need to address
for (offset, message) in resp.messages:
# deserializer_class could raise an exception here
msg = KafkaMessage(resp.topic,
resp.partition,
offset, message.key,
self._config['deserializer_class'](message.value))
# Only increment fetch offset if we safely got the message and deserialized
self._offsets.fetch[topic_partition] = offset + 1
# Then yield to user
yield msg
def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
"""
Request available fetch offsets for a single topic/partition
@param topic (str)
@param partition (int)
@param request_time_ms (int) -- Used to ask for all messages before a
certain time (ms). There are two special
values. Specify -1 to receive the latest
offset (i.e. the offset of the next coming
message) and -2 to receive the earliest
available offset. Note that because offsets
are pulled in descending order, asking for
the earliest offset will always return you
a single element.
@param max_num_offsets (int)
@return offsets (list)
"""
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
(resp,) = self._client.send_offset_request(reqs)
check_error(resp)
# Just for sanity..
# probably unnecessary
assert resp.topic == topic
assert resp.partition == partition
return resp.offsets
def offsets(self, group=None):
"""
Returns a copy of internal offsets struct
optional param: group [fetch|commit|task_done|highwater]
if no group specified, returns all groups
"""
if not group:
return {
'fetch': self.offsets('fetch'),
'commit': self.offsets('commit'),
'task_done': self.offsets('task_done'),
'highwater': self.offsets('highwater')
}
else:
return dict(deepcopy(getattr(self._offsets, group)))
def task_done(self, message):
"""
Mark a fetched message as consumed.
Offsets for messages marked as "task_done" will be stored back
to the kafka cluster for this consumer group on commit()
"""
topic_partition = (message.topic, message.partition)
offset = message.offset
# Warn on non-contiguous offsets
prev_done = self._offsets.task_done[topic_partition]
if prev_done is not None and offset != (prev_done + 1):
logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
offset, prev_done)
# Warn on smaller offsets than previous commit
# "commit" offsets are actually the offset of the next message to fetch.
prev_commit = self._offsets.commit[topic_partition]
if prev_commit is not None and ((offset + 1) <= prev_commit):
logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
offset, prev_commit)
self._offsets.task_done[topic_partition] = offset
# Check for auto-commit
if self._does_auto_commit_messages():
self._incr_auto_commit_message_count()
if self._should_auto_commit():
self.commit()
def commit(self):
"""
Store consumed message offsets (marked via task_done())
to kafka cluster for this consumer_group.
Note -- this functionality requires server version >=0.8.1.1
see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
"""
if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
# API supports storing metadata with each commit
# but for now it is unused
metadata = b''
offsets = self._offsets.task_done
commits = []
for topic_partition, task_done_offset in six.iteritems(offsets):
# Skip if None
if task_done_offset is None:
continue
# Commit offsets as the next offset to fetch
# which is consistent with the Java Client
# task_done is marked by messages consumed,
# so add one to mark the next message for fetching
commit_offset = (task_done_offset + 1)
# Skip if no change from previous committed
if commit_offset == self._offsets.commit[topic_partition]:
continue
commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request(self._config['group_id'],
commits,
fail_on_error=False)
for r in resps:
check_error(r)
topic_partition = (r.topic, r.partition)
task_done = self._offsets.task_done[topic_partition]
self._offsets.commit[topic_partition] = (task_done + 1)
if self._config['auto_commit_enable']:
self._reset_auto_commit()
return True
else:
logger.info('No new offsets found to commit in group %s', self._config['group_id'])
return False
#
# Topic/partition management private methods
#
def _consume_topic_partition(self, topic, partition):
topic = kafka_bytestring(topic)
if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition))
if topic not in self._client.topic_partitions:
raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
if partition not in self._client.get_partition_ids_for_topic(topic):
raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
"in broker metadata" % (partition, topic))
logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
self._topics.append((topic, partition))
def _refresh_metadata_on_error(self):
refresh_ms = self._config['refresh_leader_backoff_ms']
jitter_pct = 0.20
sleep_ms = random.randint(
int((1.0 - 0.5 * jitter_pct) * refresh_ms),
int((1.0 + 0.5 * jitter_pct) * refresh_ms)
)
while True:
logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
time.sleep(sleep_ms / 1000.0)
try:
self._client.load_metadata_for_topics()
except KafkaUnavailableError:
logger.warning("Unable to refresh topic metadata... cluster unavailable")
self._check_consumer_timeout()
else:
logger.info("Topic metadata refreshed")
return
#
# Offset-managment private methods
#
def _get_commit_offsets(self):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
self._config['group_id'],
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
check_error(resp)
# API spec says server wont set an error here
# but 0.8.1.1 does actually...
except UnknownTopicOrPartitionError:
pass
# -1 offset signals no commit is currently stored
if resp.offset == -1:
self._offsets.commit[topic_partition] = None
# Otherwise we committed the stored offset
# and need to fetch the next one
else:
self._offsets.commit[topic_partition] = resp.offset
def _reset_highwater_offsets(self):
for topic_partition in self._topics:
self._offsets.highwater[topic_partition] = None
def _reset_task_done_offsets(self):
for topic_partition in self._topics:
self._offsets.task_done[topic_partition] = None
def _reset_partition_offset(self, topic_partition):
(topic, partition) = topic_partition
LATEST = -1
EARLIEST = -2
request_time_ms = None
if self._config['auto_offset_reset'] == 'largest':
request_time_ms = LATEST
elif self._config['auto_offset_reset'] == 'smallest':
request_time_ms = EARLIEST
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
if sys.exc_info() == (None, None, None):
raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
'valid auto_offset_reset setting '
'(largest|smallest)')
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise
(offset, ) = self.get_partition_offsets(topic, partition,
request_time_ms, max_num_offsets=1)
return offset
#
# Consumer Timeout private methods
#
def _set_consumer_timeout_start(self):
self._consumer_timeout = False
if self._config['consumer_timeout_ms'] >= 0:
self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
def _check_consumer_timeout(self):
if self._consumer_timeout and time.time() > self._consumer_timeout:
raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
#
# Autocommit private methods
#
def _should_auto_commit(self):
if self._does_auto_commit_ms():
if time.time() >= self._next_commit_time:
return True
if self._does_auto_commit_messages():
if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
return True
return False
def _reset_auto_commit(self):
self._uncommitted_message_count = 0
self._next_commit_time = None
if self._does_auto_commit_ms():
self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
def _incr_auto_commit_message_count(self, n=1):
self._uncommitted_message_count += n
def _does_auto_commit_ms(self):
if not self._config['auto_commit_enable']:
return False
conf = self._config['auto_commit_interval_ms']
if conf is not None and conf > 0:
return True
return False
def _does_auto_commit_messages(self):
if not self._config['auto_commit_enable']:
return False
conf = self._config['auto_commit_interval_messages']
if conf is not None and conf > 0:
return True
return False
#
# Message iterator private methods
#
def __iter__(self):
return self
def __next__(self):
return self.next()
def _get_message_iterator(self):
# Fetch a new batch if needed
if self._msg_iter is None:
self._msg_iter = self.fetch_messages()
return self._msg_iter
def _reset_message_iterator(self):
self._msg_iter = None
#
# python private methods
#
def __repr__(self):
return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
for topic_partition in
self._topics])

View File

@@ -86,6 +86,18 @@ def group_by_topic_and_partition(tuples):
return out return out
def kafka_bytestring(s):
"""
Takes a string or bytes instance
Returns bytes, encoding strings in utf-8 as necessary
"""
if isinstance(s, six.binary_type):
return s
if isinstance(s, six.string_types):
return s.encode('utf-8')
raise TypeError(s)
class ReentrantTimer(object): class ReentrantTimer(object):
""" """
A timer that can be restarted, unlike threading.Timer A timer that can be restarted, unlike threading.Timer

View File

@@ -32,12 +32,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
fetch = FetchRequest(self.topic, 0, 0, 1024) fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp, = self.client.send_fetch_request([fetch]) fetch_resp, = self.client.send_fetch_request([fetch])
self.assertEquals(fetch_resp.error, 0) self.assertEqual(fetch_resp.error, 0)
self.assertEquals(fetch_resp.topic, self.topic) self.assertEqual(fetch_resp.topic, self.topic)
self.assertEquals(fetch_resp.partition, 0) self.assertEqual(fetch_resp.partition, 0)
messages = list(fetch_resp.messages) messages = list(fetch_resp.messages)
self.assertEquals(len(messages), 0) self.assertEqual(len(messages), 0)
@kafka_versions("all") @kafka_versions("all")
def test_ensure_topic_exists(self): def test_ensure_topic_exists(self):
@@ -58,10 +58,10 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
def test_commit_fetch_offsets(self): def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.topic, 0, 42, b"metadata") req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req]) (resp,) = self.client.send_offset_commit_request(b"group", [req])
self.assertEquals(resp.error, 0) self.assertEqual(resp.error, 0)
req = OffsetFetchRequest(self.topic, 0) req = OffsetFetchRequest(self.topic, 0)
(resp,) = self.client.send_offset_fetch_request(b"group", [req]) (resp,) = self.client.send_offset_fetch_request(b"group", [req])
self.assertEquals(resp.error, 0) self.assertEqual(resp.error, 0)
self.assertEquals(resp.offset, 42) self.assertEqual(resp.offset, 42)
self.assertEquals(resp.metadata, b"") # Metadata isn't stored for now self.assertEqual(resp.metadata, b"") # Metadata isn't stored for now

View File

@@ -15,14 +15,14 @@ class TestCodec(unittest.TestCase):
for i in xrange(1000): for i in xrange(1000):
s1 = random_string(100) s1 = random_string(100)
s2 = gzip_decode(gzip_encode(s1)) s2 = gzip_decode(gzip_encode(s1))
self.assertEquals(s1, s2) self.assertEqual(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available") @unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy(self): def test_snappy(self):
for i in xrange(1000): for i in xrange(1000):
s1 = random_string(100) s1 = random_string(100)
s2 = snappy_decode(snappy_encode(s1)) s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2) self.assertEqual(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available") @unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_detect_xerial(self): def test_snappy_detect_xerial(self):
@@ -53,7 +53,7 @@ class TestCodec(unittest.TestCase):
+ struct.pack('!i', block_len) + random_snappy \ + struct.pack('!i', block_len) + random_snappy \
+ struct.pack('!i', block_len2) + random_snappy2 \ + struct.pack('!i', block_len2) + random_snappy2 \
self.assertEquals(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50)) self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
@unittest.skipUnless(has_snappy(), "Snappy not available") @unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_encode_xerial(self): def test_snappy_encode_xerial(self):
@@ -68,5 +68,5 @@ class TestCodec(unittest.TestCase):
to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50) to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
self.assertEquals(compressed, to_ensure) self.assertEqual(compressed, to_ensure)

View File

@@ -120,7 +120,7 @@ class ConnTest(unittest.TestCase):
def test_recv(self): def test_recv(self):
self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload']) self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
def test_recv__reconnects_on_dirty_conn(self): def test_recv__reconnects_on_dirty_conn(self):
@@ -151,8 +151,8 @@ class ConnTest(unittest.TestCase):
def test_recv__doesnt_consume_extra_data_in_stream(self): def test_recv__doesnt_consume_extra_data_in_stream(self):
# Here just test that each call to recv will return a single payload # Here just test that each call to recv will return a single payload
self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload']) self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload2']) self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
def test_close__object_is_reusable(self): def test_close__object_is_reusable(self):

View File

@@ -2,9 +2,14 @@
from mock import MagicMock from mock import MagicMock
from . import unittest from . import unittest
from kafka.consumer import SimpleConsumer from kafka import SimpleConsumer, KafkaConsumer
from kafka.common import KafkaConfigurationError
class TestKafkaConsumer(unittest.TestCase): class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self): def test_non_integer_partitions(self):
with self.assertRaises(AssertionError): with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
def test_broker_list_required(self):
with self.assertRaises(KafkaConfigurationError):
KafkaConsumer()

View File

@@ -1,9 +1,12 @@
import logging
import os import os
from six.moves import xrange from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, create_message from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall from kafka.common import (
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from test.fixtures import ZookeeperFixture, KafkaFixture from test.fixtures import ZookeeperFixture, KafkaFixture
@@ -36,16 +39,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ create_message(self.msg(str(msg))) for msg in messages ] messages = [ create_message(self.msg(str(msg))) for msg in messages ]
produce = ProduceRequest(self.topic, partition, messages = messages) produce = ProduceRequest(self.topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce]) resp, = self.client.send_produce_request([produce])
self.assertEquals(resp.error, 0) self.assertEqual(resp.error, 0)
return [ x.value for x in messages ] return [ x.value for x in messages ]
def assert_message_count(self, messages, num_messages): def assert_message_count(self, messages, num_messages):
# Make sure we got them all # Make sure we got them all
self.assertEquals(len(messages), num_messages) self.assertEqual(len(messages), num_messages)
# Make sure there are no duplicates # Make sure there are no duplicates
self.assertEquals(len(set(messages)), num_messages) self.assertEqual(len(set(messages)), num_messages)
def consumer(self, **kwargs):
if os.environ['KAFKA_VERSION'] == "0.8.0":
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
kwargs['auto_commit'] = False
else:
kwargs.setdefault('auto_commit', True)
consumer_class = kwargs.pop('consumer', SimpleConsumer)
group = kwargs.pop('group', self.id().encode('utf-8'))
topic = kwargs.pop('topic', self.topic)
if consumer_class == SimpleConsumer:
kwargs.setdefault('iter_timeout', 0)
return consumer_class(self.client, group, topic, **kwargs)
def kafka_consumer(self, **configs):
brokers = '%s:%d' % (self.server.host, self.server.port)
consumer = KafkaConsumer(self.topic,
metadata_broker_list=brokers,
**configs)
return consumer
@kafka_versions("all") @kafka_versions("all")
def test_simple_consumer(self): def test_simple_consumer(self):
@@ -114,9 +140,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10)) self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20)) self.send_messages(1, range(10, 20))
self.assertEquals(consumer.pending(), 20) consumer = self.consumer()
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10) self.assertEqual(consumer.pending(), 20)
self.assertEqual(consumer.pending(partitions=[0]), 10)
self.assertEqual(consumer.pending(partitions=[1]), 10)
# move to last message, so one partition should have 1 pending # move to last message, so one partition should have 1 pending
# message and other 0 # message and other 0
@@ -175,9 +203,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
self.assertEquals(consumer.pending(), 20) self.assertEqual(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEqual(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10) self.assertEqual(consumer.pending(partitions=[1]), 10)
consumer.stop() consumer.stop()
@@ -225,7 +253,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Consume giant message successfully # Consume giant message successfully
message = big_consumer.get_message(block=False, timeout=10) message = big_consumer.get_message(block=False, timeout=10)
self.assertIsNotNone(message) self.assertIsNotNone(message)
self.assertEquals(message.message.value, huge_message) self.assertEqual(message.message.value, huge_message)
big_consumer.stop() big_consumer.stop()
@@ -273,20 +301,101 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.consumer(buffer_size=1024, max_buffer_size=2048) consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
messages = [ message for message in consumer ] messages = [ message for message in consumer ]
self.assertEquals(len(messages), 2) self.assertEqual(len(messages), 2)
def consumer(self, **kwargs): @kafka_versions("all")
if os.environ['KAFKA_VERSION'] == "0.8.0": def test_kafka_consumer(self):
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off self.send_messages(0, range(0, 100))
kwargs['auto_commit'] = False self.send_messages(1, range(100, 200))
else:
kwargs.setdefault('auto_commit', True)
consumer_class = kwargs.pop('consumer', SimpleConsumer) # Start a consumer
group = kwargs.pop('group', self.id().encode('utf-8')) consumer = self.kafka_consumer(auto_offset_reset='smallest',
topic = kwargs.pop('topic', self.topic) consumer_timeout_ms=5000)
n = 0
messages = {0: set(), 1: set()}
logging.debug("kafka consumer offsets: %s" % consumer.offsets())
for m in consumer:
logging.debug("Consumed message %s" % repr(m))
n += 1
messages[m.partition].add(m.offset)
if n >= 200:
break
if consumer_class == SimpleConsumer: self.assertEqual(len(messages[0]), 100)
kwargs.setdefault('iter_timeout', 0) self.assertEqual(len(messages[1]), 100)
return consumer_class(self.client, group, topic, **kwargs) @kafka_versions("all")
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='smallest',
consumer_timeout_ms=TIMEOUT_MS)
# Ask for 5 messages, nothing in queue, block 5 seconds
with Timer() as t:
with self.assertRaises(ConsumerTimeout):
msg = consumer.next()
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
self.send_messages(0, range(0, 10))
# Ask for 5 messages, 10 in queue. Get 5 back, no blocking
messages = set()
with Timer() as t:
for i in range(5):
msg = consumer.next()
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
# Ask for 10 messages, get 5 back, block 5 seconds
messages = set()
with Timer() as t:
with self.assertRaises(ConsumerTimeout):
for i in range(10):
msg = consumer.next()
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
@kafka_versions("0.8.1", "0.8.1.1")
def test_kafka_consumer__offset_commit_resume(self):
GROUP_ID = random_string(10)
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
# Start a consumer
consumer1 = self.kafka_consumer(
group_id = GROUP_ID,
auto_commit_enable = True,
auto_commit_interval_ms = None,
auto_commit_interval_messages = 20,
auto_offset_reset='smallest',
)
# Grab the first 195 messages
output_msgs1 = []
for _ in xrange(195):
m = consumer1.next()
output_msgs1.append(m)
consumer1.task_done(m)
self.assert_message_count(output_msgs1, 195)
# The total offset across both partitions should be at 180
consumer2 = self.kafka_consumer(
group_id = GROUP_ID,
auto_commit_enable = True,
auto_commit_interval_ms = None,
auto_commit_interval_messages = 20,
consumer_timeout_ms = 100,
auto_offset_reset='smallest',
)
# 181-200
output_msgs2 = []
with self.assertRaises(ConsumerTimeout):
while True:
m = consumer2.next()
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)

View File

@@ -121,7 +121,7 @@ class TestFailover(KafkaIntegrationTestCase):
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j) logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
resp = producer.send_messages(topic, partition, random_string(10)) resp = producer.send_messages(topic, partition, random_string(10))
if len(resp) > 0: if len(resp) > 0:
self.assertEquals(resp[0].error, 0) self.assertEqual(resp[0].error, 0)
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j) logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
def _kill_leader(self, topic, partition): def _kill_leader(self, topic, partition):

View File

@@ -3,27 +3,27 @@ from . import unittest
class TestPackage(unittest.TestCase): class TestPackage(unittest.TestCase):
def test_top_level_namespace(self): def test_top_level_namespace(self):
import kafka as kafka1 import kafka as kafka1
self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient") self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient")
self.assertEquals(kafka1.client.__name__, "kafka.client") self.assertEqual(kafka1.client.__name__, "kafka.client")
self.assertEquals(kafka1.codec.__name__, "kafka.codec") self.assertEqual(kafka1.codec.__name__, "kafka.codec")
def test_submodule_namespace(self): def test_submodule_namespace(self):
import kafka.client as client1 import kafka.client as client1
self.assertEquals(client1.__name__, "kafka.client") self.assertEqual(client1.__name__, "kafka.client")
self.assertEquals(client1.KafkaClient.__name__, "KafkaClient") self.assertEqual(client1.KafkaClient.__name__, "KafkaClient")
from kafka import client as client2 from kafka import client as client2
self.assertEquals(client2.__name__, "kafka.client") self.assertEqual(client2.__name__, "kafka.client")
self.assertEquals(client2.KafkaClient.__name__, "KafkaClient") self.assertEqual(client2.KafkaClient.__name__, "KafkaClient")
from kafka.client import KafkaClient as KafkaClient1 from kafka.client import KafkaClient as KafkaClient1
self.assertEquals(KafkaClient1.__name__, "KafkaClient") self.assertEqual(KafkaClient1.__name__, "KafkaClient")
from kafka.codec import gzip_encode as gzip_encode1 from kafka.codec import gzip_encode as gzip_encode1
self.assertEquals(gzip_encode1.__name__, "gzip_encode") self.assertEqual(gzip_encode1.__name__, "gzip_encode")
from kafka import KafkaClient as KafkaClient2 from kafka import KafkaClient as KafkaClient2
self.assertEquals(KafkaClient2.__name__, "KafkaClient") self.assertEqual(KafkaClient2.__name__, "KafkaClient")
from kafka.codec import snappy_encode from kafka.codec import snappy_encode
self.assertEquals(snappy_encode.__name__, "snappy_encode") self.assertEqual(snappy_encode.__name__, "snappy_encode")

View File

@@ -251,7 +251,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages(self.topic, self.msg("one")) resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEquals(len(resp), 0) self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop() producer.stop()
@@ -301,7 +301,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
) )
# Batch mode is async. No ack # Batch mode is async. No ack
self.assertEquals(len(resp), 0) self.assertEqual(len(resp), 0)
# It hasn't sent yet # It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, []) self.assert_fetch_offset(0, start_offset0, [])
@@ -314,7 +314,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
) )
# Batch mode is async. No ack # Batch mode is async. No ack
self.assertEquals(len(resp), 0) self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.assert_fetch_offset(0, start_offset0, [
self.msg("one"), self.msg("one"),
@@ -350,7 +350,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
) )
# Batch mode is async. No ack # Batch mode is async. No ack
self.assertEquals(len(resp), 0) self.assertEqual(len(resp), 0)
# It hasn't sent yet # It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, []) self.assert_fetch_offset(0, start_offset0, [])
@@ -363,7 +363,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
) )
# Batch mode is async. No ack # Batch mode is async. No ack
self.assertEquals(len(resp), 0) self.assertEqual(len(resp), 0)
# Wait the timeout out # Wait the timeout out
time.sleep(5) time.sleep(5)
@@ -389,7 +389,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(self.client, async=True) producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages(self.topic, self.msg("one")) resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEquals(len(resp), 0) self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
@@ -402,7 +402,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
resp = producer.send(self.topic, self.key("key1"), self.msg("one")) resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
self.assertEquals(len(resp), 0) self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
@@ -429,9 +429,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ]) resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
self.assertEquals(resp.error, 0) self.assertEqual(resp.error, 0)
self.assertEquals(resp.partition, partition) self.assertEqual(resp.partition, partition)
messages = [ x.message.value for x in resp.messages ] messages = [ x.message.value for x in resp.messages ]
self.assertEqual(messages, expected_messages) self.assertEqual(messages, expected_messages)
self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages)) self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages))