Separate consumers/producers/partitioners
This commit is contained in:
@@ -1,698 +0,0 @@
|
|||||||
from __future__ import absolute_import
|
|
||||||
|
|
||||||
try:
|
|
||||||
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
|
|
||||||
except ImportError: # python 2
|
|
||||||
from itertools import izip_longest as izip_longest, repeat
|
|
||||||
import logging
|
|
||||||
import time
|
|
||||||
import numbers
|
|
||||||
from threading import Lock
|
|
||||||
from multiprocessing import Process, Queue as MPQueue, Event, Value
|
|
||||||
|
|
||||||
import six
|
|
||||||
|
|
||||||
try:
|
|
||||||
from Queue import Empty, Queue
|
|
||||||
except ImportError: # python 2
|
|
||||||
from queue import Empty, Queue
|
|
||||||
|
|
||||||
import kafka.common
|
|
||||||
from kafka.common import (
|
|
||||||
FetchRequest, OffsetRequest,
|
|
||||||
OffsetCommitRequest, OffsetFetchRequest,
|
|
||||||
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
|
|
||||||
UnknownTopicOrPartitionError
|
|
||||||
)
|
|
||||||
|
|
||||||
from kafka.util import ReentrantTimer
|
|
||||||
|
|
||||||
log = logging.getLogger("kafka")
|
|
||||||
|
|
||||||
AUTO_COMMIT_MSG_COUNT = 100
|
|
||||||
AUTO_COMMIT_INTERVAL = 5000
|
|
||||||
|
|
||||||
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
|
|
||||||
FETCH_MAX_WAIT_TIME = 100
|
|
||||||
FETCH_MIN_BYTES = 4096
|
|
||||||
FETCH_BUFFER_SIZE_BYTES = 4096
|
|
||||||
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
|
|
||||||
|
|
||||||
ITER_TIMEOUT_SECONDS = 60
|
|
||||||
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
|
|
||||||
|
|
||||||
|
|
||||||
class FetchContext(object):
|
|
||||||
"""
|
|
||||||
Class for managing the state of a consumer during fetch
|
|
||||||
"""
|
|
||||||
def __init__(self, consumer, block, timeout):
|
|
||||||
self.consumer = consumer
|
|
||||||
self.block = block
|
|
||||||
|
|
||||||
if block:
|
|
||||||
if not timeout:
|
|
||||||
timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
|
|
||||||
self.timeout = timeout * 1000
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
"""Set fetch values based on blocking status"""
|
|
||||||
self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
|
|
||||||
self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
|
|
||||||
if self.block:
|
|
||||||
self.consumer.fetch_max_wait_time = self.timeout
|
|
||||||
self.consumer.fetch_min_bytes = 1
|
|
||||||
else:
|
|
||||||
self.consumer.fetch_min_bytes = 0
|
|
||||||
|
|
||||||
def __exit__(self, type, value, traceback):
|
|
||||||
"""Reset values"""
|
|
||||||
self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
|
|
||||||
self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
|
|
||||||
|
|
||||||
|
|
||||||
class Consumer(object):
|
|
||||||
"""
|
|
||||||
Base class to be used by other consumers. Not to be used directly
|
|
||||||
|
|
||||||
This base class provides logic for
|
|
||||||
* initialization and fetching metadata of partitions
|
|
||||||
* Auto-commit logic
|
|
||||||
* APIs for fetching pending message count
|
|
||||||
"""
|
|
||||||
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
|
|
||||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
|
||||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
|
|
||||||
|
|
||||||
self.client = client
|
|
||||||
self.topic = topic
|
|
||||||
self.group = group
|
|
||||||
self.client.load_metadata_for_topics(topic)
|
|
||||||
self.offsets = {}
|
|
||||||
|
|
||||||
if not partitions:
|
|
||||||
partitions = self.client.get_partition_ids_for_topic(topic)
|
|
||||||
else:
|
|
||||||
assert all(isinstance(x, numbers.Integral) for x in partitions)
|
|
||||||
|
|
||||||
# Variables for handling offset commits
|
|
||||||
self.commit_lock = Lock()
|
|
||||||
self.commit_timer = None
|
|
||||||
self.count_since_commit = 0
|
|
||||||
self.auto_commit = auto_commit
|
|
||||||
self.auto_commit_every_n = auto_commit_every_n
|
|
||||||
self.auto_commit_every_t = auto_commit_every_t
|
|
||||||
|
|
||||||
# Set up the auto-commit timer
|
|
||||||
if auto_commit is True and auto_commit_every_t is not None:
|
|
||||||
self.commit_timer = ReentrantTimer(auto_commit_every_t,
|
|
||||||
self.commit)
|
|
||||||
self.commit_timer.start()
|
|
||||||
|
|
||||||
if auto_commit:
|
|
||||||
self.fetch_last_known_offsets(partitions)
|
|
||||||
else:
|
|
||||||
for partition in partitions:
|
|
||||||
self.offsets[partition] = 0
|
|
||||||
|
|
||||||
def fetch_last_known_offsets(self, partitions=None):
|
|
||||||
if not partitions:
|
|
||||||
partitions = self.client.get_partition_ids_for_topic(self.topic)
|
|
||||||
|
|
||||||
def get_or_init_offset(resp):
|
|
||||||
try:
|
|
||||||
kafka.common.check_error(resp)
|
|
||||||
return resp.offset
|
|
||||||
except UnknownTopicOrPartitionError:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
for partition in partitions:
|
|
||||||
req = OffsetFetchRequest(self.topic, partition)
|
|
||||||
(resp,) = self.client.send_offset_fetch_request(self.group, [req],
|
|
||||||
fail_on_error=False)
|
|
||||||
self.offsets[partition] = get_or_init_offset(resp)
|
|
||||||
self.fetch_offsets = self.offsets.copy()
|
|
||||||
|
|
||||||
def commit(self, partitions=None):
|
|
||||||
"""
|
|
||||||
Commit offsets for this consumer
|
|
||||||
|
|
||||||
partitions: list of partitions to commit, default is to commit
|
|
||||||
all of them
|
|
||||||
"""
|
|
||||||
|
|
||||||
# short circuit if nothing happened. This check is kept outside
|
|
||||||
# to prevent un-necessarily acquiring a lock for checking the state
|
|
||||||
if self.count_since_commit == 0:
|
|
||||||
return
|
|
||||||
|
|
||||||
with self.commit_lock:
|
|
||||||
# Do this check again, just in case the state has changed
|
|
||||||
# during the lock acquiring timeout
|
|
||||||
if self.count_since_commit == 0:
|
|
||||||
return
|
|
||||||
|
|
||||||
reqs = []
|
|
||||||
if not partitions: # commit all partitions
|
|
||||||
partitions = self.offsets.keys()
|
|
||||||
|
|
||||||
for partition in partitions:
|
|
||||||
offset = self.offsets[partition]
|
|
||||||
log.debug("Commit offset %d in SimpleConsumer: "
|
|
||||||
"group=%s, topic=%s, partition=%s" %
|
|
||||||
(offset, self.group, self.topic, partition))
|
|
||||||
|
|
||||||
reqs.append(OffsetCommitRequest(self.topic, partition,
|
|
||||||
offset, None))
|
|
||||||
|
|
||||||
resps = self.client.send_offset_commit_request(self.group, reqs)
|
|
||||||
for resp in resps:
|
|
||||||
kafka.common.check_error(resp)
|
|
||||||
|
|
||||||
self.count_since_commit = 0
|
|
||||||
|
|
||||||
def _auto_commit(self):
|
|
||||||
"""
|
|
||||||
Check if we have to commit based on number of messages and commit
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Check if we are supposed to do an auto-commit
|
|
||||||
if not self.auto_commit or self.auto_commit_every_n is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
if self.count_since_commit >= self.auto_commit_every_n:
|
|
||||||
self.commit()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
if self.commit_timer is not None:
|
|
||||||
self.commit_timer.stop()
|
|
||||||
self.commit()
|
|
||||||
|
|
||||||
def pending(self, partitions=None):
|
|
||||||
"""
|
|
||||||
Gets the pending message count
|
|
||||||
|
|
||||||
partitions: list of partitions to check for, default is to check all
|
|
||||||
"""
|
|
||||||
if not partitions:
|
|
||||||
partitions = self.offsets.keys()
|
|
||||||
|
|
||||||
total = 0
|
|
||||||
reqs = []
|
|
||||||
|
|
||||||
for partition in partitions:
|
|
||||||
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
|
||||||
|
|
||||||
resps = self.client.send_offset_request(reqs)
|
|
||||||
for resp in resps:
|
|
||||||
partition = resp.partition
|
|
||||||
pending = resp.offsets[0]
|
|
||||||
offset = self.offsets[partition]
|
|
||||||
total += pending - offset - (1 if offset > 0 else 0)
|
|
||||||
|
|
||||||
return total
|
|
||||||
|
|
||||||
|
|
||||||
class SimpleConsumer(Consumer):
|
|
||||||
"""
|
|
||||||
A simple consumer implementation that consumes all/specified partitions
|
|
||||||
for a topic
|
|
||||||
|
|
||||||
client: a connected KafkaClient
|
|
||||||
group: a name for this consumer, used for offset storage and must be unique
|
|
||||||
topic: the topic to consume
|
|
||||||
partitions: An optional list of partitions to consume the data from
|
|
||||||
|
|
||||||
auto_commit: default True. Whether or not to auto commit the offsets
|
|
||||||
auto_commit_every_n: default 100. How many messages to consume
|
|
||||||
before a commit
|
|
||||||
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
|
||||||
wait before commit
|
|
||||||
fetch_size_bytes: number of bytes to request in a FetchRequest
|
|
||||||
buffer_size: default 4K. Initial number of bytes to tell kafka we
|
|
||||||
have available. This will double as needed.
|
|
||||||
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
|
|
||||||
available. None means no limit.
|
|
||||||
iter_timeout: default None. How much time (in seconds) to wait for a
|
|
||||||
message in the iterator before exiting. None means no
|
|
||||||
timeout, so it will wait forever.
|
|
||||||
|
|
||||||
Auto commit details:
|
|
||||||
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
|
||||||
reset one another when one is triggered. These triggers simply call the
|
|
||||||
commit method on this class. A manual call to commit will also reset
|
|
||||||
these triggers
|
|
||||||
"""
|
|
||||||
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
|
|
||||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
|
||||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
|
||||||
fetch_size_bytes=FETCH_MIN_BYTES,
|
|
||||||
buffer_size=FETCH_BUFFER_SIZE_BYTES,
|
|
||||||
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
|
|
||||||
iter_timeout=None):
|
|
||||||
super(SimpleConsumer, self).__init__(
|
|
||||||
client, group, topic,
|
|
||||||
partitions=partitions,
|
|
||||||
auto_commit=auto_commit,
|
|
||||||
auto_commit_every_n=auto_commit_every_n,
|
|
||||||
auto_commit_every_t=auto_commit_every_t)
|
|
||||||
|
|
||||||
if max_buffer_size is not None and buffer_size > max_buffer_size:
|
|
||||||
raise ValueError("buffer_size (%d) is greater than "
|
|
||||||
"max_buffer_size (%d)" %
|
|
||||||
(buffer_size, max_buffer_size))
|
|
||||||
self.buffer_size = buffer_size
|
|
||||||
self.max_buffer_size = max_buffer_size
|
|
||||||
self.partition_info = False # Do not return partition info in msgs
|
|
||||||
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
|
|
||||||
self.fetch_min_bytes = fetch_size_bytes
|
|
||||||
self.fetch_offsets = self.offsets.copy()
|
|
||||||
self.iter_timeout = iter_timeout
|
|
||||||
self.queue = Queue()
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
|
|
||||||
(self.group, self.topic, str(self.offsets.keys()))
|
|
||||||
|
|
||||||
def provide_partition_info(self):
|
|
||||||
"""
|
|
||||||
Indicates that partition info must be returned by the consumer
|
|
||||||
"""
|
|
||||||
self.partition_info = True
|
|
||||||
|
|
||||||
def seek(self, offset, whence):
|
|
||||||
"""
|
|
||||||
Alter the current offset in the consumer, similar to fseek
|
|
||||||
|
|
||||||
offset: how much to modify the offset
|
|
||||||
whence: where to modify it from
|
|
||||||
0 is relative to the earliest available offset (head)
|
|
||||||
1 is relative to the current offset
|
|
||||||
2 is relative to the latest known offset (tail)
|
|
||||||
"""
|
|
||||||
|
|
||||||
if whence == 1: # relative to current position
|
|
||||||
for partition, _offset in self.offsets.items():
|
|
||||||
self.offsets[partition] = _offset + offset
|
|
||||||
elif whence in (0, 2): # relative to beginning or end
|
|
||||||
# divide the request offset by number of partitions,
|
|
||||||
# distribute the remained evenly
|
|
||||||
(delta, rem) = divmod(offset, len(self.offsets))
|
|
||||||
deltas = {}
|
|
||||||
for partition, r in izip_longest(self.offsets.keys(),
|
|
||||||
repeat(1, rem), fillvalue=0):
|
|
||||||
deltas[partition] = delta + r
|
|
||||||
|
|
||||||
reqs = []
|
|
||||||
for partition in self.offsets.keys():
|
|
||||||
if whence == 0:
|
|
||||||
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
|
|
||||||
elif whence == 2:
|
|
||||||
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
|
||||||
else:
|
|
||||||
pass
|
|
||||||
|
|
||||||
resps = self.client.send_offset_request(reqs)
|
|
||||||
for resp in resps:
|
|
||||||
self.offsets[resp.partition] = \
|
|
||||||
resp.offsets[0] + deltas[resp.partition]
|
|
||||||
else:
|
|
||||||
raise ValueError("Unexpected value for `whence`, %d" % whence)
|
|
||||||
|
|
||||||
# Reset queue and fetch offsets since they are invalid
|
|
||||||
self.fetch_offsets = self.offsets.copy()
|
|
||||||
if self.auto_commit:
|
|
||||||
self.count_since_commit += 1
|
|
||||||
self.commit()
|
|
||||||
|
|
||||||
self.queue = Queue()
|
|
||||||
|
|
||||||
def get_messages(self, count=1, block=True, timeout=0.1):
|
|
||||||
"""
|
|
||||||
Fetch the specified number of messages
|
|
||||||
|
|
||||||
count: Indicates the maximum number of messages to be fetched
|
|
||||||
block: If True, the API will block till some messages are fetched.
|
|
||||||
timeout: If block is True, the function will block for the specified
|
|
||||||
time (in seconds) until count messages is fetched. If None,
|
|
||||||
it will block forever.
|
|
||||||
"""
|
|
||||||
messages = []
|
|
||||||
if timeout is not None:
|
|
||||||
max_time = time.time() + timeout
|
|
||||||
|
|
||||||
new_offsets = {}
|
|
||||||
while count > 0 and (timeout is None or timeout > 0):
|
|
||||||
result = self._get_message(block, timeout, get_partition_info=True,
|
|
||||||
update_offset=False)
|
|
||||||
if result:
|
|
||||||
partition, message = result
|
|
||||||
if self.partition_info:
|
|
||||||
messages.append(result)
|
|
||||||
else:
|
|
||||||
messages.append(message)
|
|
||||||
new_offsets[partition] = message.offset + 1
|
|
||||||
count -= 1
|
|
||||||
else:
|
|
||||||
# Ran out of messages for the last request.
|
|
||||||
if not block:
|
|
||||||
# If we're not blocking, break.
|
|
||||||
break
|
|
||||||
if timeout is not None:
|
|
||||||
# If we're blocking and have a timeout, reduce it to the
|
|
||||||
# appropriate value
|
|
||||||
timeout = max_time - time.time()
|
|
||||||
|
|
||||||
# Update and commit offsets if necessary
|
|
||||||
self.offsets.update(new_offsets)
|
|
||||||
self.count_since_commit += len(messages)
|
|
||||||
self._auto_commit()
|
|
||||||
return messages
|
|
||||||
|
|
||||||
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
|
|
||||||
return self._get_message(block, timeout, get_partition_info)
|
|
||||||
|
|
||||||
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
|
|
||||||
update_offset=True):
|
|
||||||
"""
|
|
||||||
If no messages can be fetched, returns None.
|
|
||||||
If get_partition_info is None, it defaults to self.partition_info
|
|
||||||
If get_partition_info is True, returns (partition, message)
|
|
||||||
If get_partition_info is False, returns message
|
|
||||||
"""
|
|
||||||
if self.queue.empty():
|
|
||||||
# We're out of messages, go grab some more.
|
|
||||||
with FetchContext(self, block, timeout):
|
|
||||||
self._fetch()
|
|
||||||
try:
|
|
||||||
partition, message = self.queue.get_nowait()
|
|
||||||
|
|
||||||
if update_offset:
|
|
||||||
# Update partition offset
|
|
||||||
self.offsets[partition] = message.offset + 1
|
|
||||||
|
|
||||||
# Count, check and commit messages if necessary
|
|
||||||
self.count_since_commit += 1
|
|
||||||
self._auto_commit()
|
|
||||||
|
|
||||||
if get_partition_info is None:
|
|
||||||
get_partition_info = self.partition_info
|
|
||||||
if get_partition_info:
|
|
||||||
return partition, message
|
|
||||||
else:
|
|
||||||
return message
|
|
||||||
except Empty:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
if self.iter_timeout is None:
|
|
||||||
timeout = ITER_TIMEOUT_SECONDS
|
|
||||||
else:
|
|
||||||
timeout = self.iter_timeout
|
|
||||||
|
|
||||||
while True:
|
|
||||||
message = self.get_message(True, timeout)
|
|
||||||
if message:
|
|
||||||
yield message
|
|
||||||
elif self.iter_timeout is None:
|
|
||||||
# We did not receive any message yet but we don't have a
|
|
||||||
# timeout, so give up the CPU for a while before trying again
|
|
||||||
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
|
|
||||||
else:
|
|
||||||
# Timed out waiting for a message
|
|
||||||
break
|
|
||||||
|
|
||||||
def _fetch(self):
|
|
||||||
# Create fetch request payloads for all the partitions
|
|
||||||
partitions = dict((p, self.buffer_size)
|
|
||||||
for p in self.fetch_offsets.keys())
|
|
||||||
while partitions:
|
|
||||||
requests = []
|
|
||||||
for partition, buffer_size in six.iteritems(partitions):
|
|
||||||
requests.append(FetchRequest(self.topic, partition,
|
|
||||||
self.fetch_offsets[partition],
|
|
||||||
buffer_size))
|
|
||||||
# Send request
|
|
||||||
responses = self.client.send_fetch_request(
|
|
||||||
requests,
|
|
||||||
max_wait_time=int(self.fetch_max_wait_time),
|
|
||||||
min_bytes=self.fetch_min_bytes)
|
|
||||||
|
|
||||||
retry_partitions = {}
|
|
||||||
for resp in responses:
|
|
||||||
partition = resp.partition
|
|
||||||
buffer_size = partitions[partition]
|
|
||||||
try:
|
|
||||||
for message in resp.messages:
|
|
||||||
# Put the message in our queue
|
|
||||||
self.queue.put((partition, message))
|
|
||||||
self.fetch_offsets[partition] = message.offset + 1
|
|
||||||
except ConsumerFetchSizeTooSmall:
|
|
||||||
if (self.max_buffer_size is not None and
|
|
||||||
buffer_size == self.max_buffer_size):
|
|
||||||
log.error("Max fetch size %d too small",
|
|
||||||
self.max_buffer_size)
|
|
||||||
raise
|
|
||||||
if self.max_buffer_size is None:
|
|
||||||
buffer_size *= 2
|
|
||||||
else:
|
|
||||||
buffer_size = max(buffer_size * 2,
|
|
||||||
self.max_buffer_size)
|
|
||||||
log.warn("Fetch size too small, increase to %d (2x) "
|
|
||||||
"and retry", buffer_size)
|
|
||||||
retry_partitions[partition] = buffer_size
|
|
||||||
except ConsumerNoMoreData as e:
|
|
||||||
log.debug("Iteration was ended by %r", e)
|
|
||||||
except StopIteration:
|
|
||||||
# Stop iterating through this partition
|
|
||||||
log.debug("Done iterating over partition %s" % partition)
|
|
||||||
partitions = retry_partitions
|
|
||||||
|
|
||||||
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
|
||||||
"""
|
|
||||||
A child process worker which consumes messages based on the
|
|
||||||
notifications given by the controller process
|
|
||||||
|
|
||||||
NOTE: Ideally, this should have been a method inside the Consumer
|
|
||||||
class. However, multiprocessing module has issues in windows. The
|
|
||||||
functionality breaks unless this function is kept outside of a class
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Make the child processes open separate socket connections
|
|
||||||
client.reinit()
|
|
||||||
|
|
||||||
# We will start consumers without auto-commit. Auto-commit will be
|
|
||||||
# done by the master controller process.
|
|
||||||
consumer = SimpleConsumer(client, group, topic,
|
|
||||||
partitions=chunk,
|
|
||||||
auto_commit=False,
|
|
||||||
auto_commit_every_n=None,
|
|
||||||
auto_commit_every_t=None)
|
|
||||||
|
|
||||||
# Ensure that the consumer provides the partition information
|
|
||||||
consumer.provide_partition_info()
|
|
||||||
|
|
||||||
while True:
|
|
||||||
# Wait till the controller indicates us to start consumption
|
|
||||||
start.wait()
|
|
||||||
|
|
||||||
# If we are asked to quit, do so
|
|
||||||
if exit.is_set():
|
|
||||||
break
|
|
||||||
|
|
||||||
# Consume messages and add them to the queue. If the controller
|
|
||||||
# indicates a specific number of messages, follow that advice
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
message = consumer.get_message()
|
|
||||||
if message:
|
|
||||||
queue.put(message)
|
|
||||||
count += 1
|
|
||||||
|
|
||||||
# We have reached the required size. The controller might have
|
|
||||||
# more than what he needs. Wait for a while.
|
|
||||||
# Without this logic, it is possible that we run into a big
|
|
||||||
# loop consuming all available messages before the controller
|
|
||||||
# can reset the 'start' event
|
|
||||||
if count == size.value:
|
|
||||||
pause.wait()
|
|
||||||
|
|
||||||
else:
|
|
||||||
# In case we did not receive any message, give up the CPU for
|
|
||||||
# a while before we try again
|
|
||||||
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
|
|
||||||
|
|
||||||
consumer.stop()
|
|
||||||
|
|
||||||
|
|
||||||
class MultiProcessConsumer(Consumer):
|
|
||||||
"""
|
|
||||||
A consumer implementation that consumes partitions for a topic in
|
|
||||||
parallel using multiple processes
|
|
||||||
|
|
||||||
client: a connected KafkaClient
|
|
||||||
group: a name for this consumer, used for offset storage and must be unique
|
|
||||||
topic: the topic to consume
|
|
||||||
|
|
||||||
auto_commit: default True. Whether or not to auto commit the offsets
|
|
||||||
auto_commit_every_n: default 100. How many messages to consume
|
|
||||||
before a commit
|
|
||||||
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
|
||||||
wait before commit
|
|
||||||
num_procs: Number of processes to start for consuming messages.
|
|
||||||
The available partitions will be divided among these processes
|
|
||||||
partitions_per_proc: Number of partitions to be allocated per process
|
|
||||||
(overrides num_procs)
|
|
||||||
|
|
||||||
Auto commit details:
|
|
||||||
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
|
||||||
reset one another when one is triggered. These triggers simply call the
|
|
||||||
commit method on this class. A manual call to commit will also reset
|
|
||||||
these triggers
|
|
||||||
"""
|
|
||||||
def __init__(self, client, group, topic, auto_commit=True,
|
|
||||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
|
||||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
|
||||||
num_procs=1, partitions_per_proc=0):
|
|
||||||
|
|
||||||
# Initiate the base consumer class
|
|
||||||
super(MultiProcessConsumer, self).__init__(
|
|
||||||
client, group, topic,
|
|
||||||
partitions=None,
|
|
||||||
auto_commit=auto_commit,
|
|
||||||
auto_commit_every_n=auto_commit_every_n,
|
|
||||||
auto_commit_every_t=auto_commit_every_t)
|
|
||||||
|
|
||||||
# Variables for managing and controlling the data flow from
|
|
||||||
# consumer child process to master
|
|
||||||
self.queue = MPQueue(1024) # Child consumers dump messages into this
|
|
||||||
self.start = Event() # Indicates the consumers to start fetch
|
|
||||||
self.exit = Event() # Requests the consumers to shutdown
|
|
||||||
self.pause = Event() # Requests the consumers to pause fetch
|
|
||||||
self.size = Value('i', 0) # Indicator of number of messages to fetch
|
|
||||||
|
|
||||||
partitions = self.offsets.keys()
|
|
||||||
|
|
||||||
# If unspecified, start one consumer per partition
|
|
||||||
# The logic below ensures that
|
|
||||||
# * we do not cross the num_procs limit
|
|
||||||
# * we have an even distribution of partitions among processes
|
|
||||||
if not partitions_per_proc:
|
|
||||||
partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
|
|
||||||
if partitions_per_proc < num_procs * 0.5:
|
|
||||||
partitions_per_proc += 1
|
|
||||||
|
|
||||||
# The final set of chunks
|
|
||||||
chunker = lambda *x: [] + list(x)
|
|
||||||
chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
|
|
||||||
|
|
||||||
self.procs = []
|
|
||||||
for chunk in chunks:
|
|
||||||
chunk = filter(lambda x: x is not None, chunk)
|
|
||||||
args = (client.copy(),
|
|
||||||
group, topic, list(chunk),
|
|
||||||
self.queue, self.start, self.exit,
|
|
||||||
self.pause, self.size)
|
|
||||||
|
|
||||||
proc = Process(target=_mp_consume, args=args)
|
|
||||||
proc.daemon = True
|
|
||||||
proc.start()
|
|
||||||
self.procs.append(proc)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
|
|
||||||
(self.group, self.topic, len(self.procs))
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
# Set exit and start off all waiting consumers
|
|
||||||
self.exit.set()
|
|
||||||
self.pause.set()
|
|
||||||
self.start.set()
|
|
||||||
|
|
||||||
for proc in self.procs:
|
|
||||||
proc.join()
|
|
||||||
proc.terminate()
|
|
||||||
|
|
||||||
super(MultiProcessConsumer, self).stop()
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
"""
|
|
||||||
Iterator to consume the messages available on this consumer
|
|
||||||
"""
|
|
||||||
# Trigger the consumer procs to start off.
|
|
||||||
# We will iterate till there are no more messages available
|
|
||||||
self.size.value = 0
|
|
||||||
self.pause.set()
|
|
||||||
|
|
||||||
while True:
|
|
||||||
self.start.set()
|
|
||||||
try:
|
|
||||||
# We will block for a small while so that the consumers get
|
|
||||||
# a chance to run and put some messages in the queue
|
|
||||||
# TODO: This is a hack and will make the consumer block for
|
|
||||||
# at least one second. Need to find a better way of doing this
|
|
||||||
partition, message = self.queue.get(block=True, timeout=1)
|
|
||||||
except Empty:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Count, check and commit messages if necessary
|
|
||||||
self.offsets[partition] = message.offset + 1
|
|
||||||
self.start.clear()
|
|
||||||
self.count_since_commit += 1
|
|
||||||
self._auto_commit()
|
|
||||||
yield message
|
|
||||||
|
|
||||||
self.start.clear()
|
|
||||||
|
|
||||||
def get_messages(self, count=1, block=True, timeout=10):
|
|
||||||
"""
|
|
||||||
Fetch the specified number of messages
|
|
||||||
|
|
||||||
count: Indicates the maximum number of messages to be fetched
|
|
||||||
block: If True, the API will block till some messages are fetched.
|
|
||||||
timeout: If block is True, the function will block for the specified
|
|
||||||
time (in seconds) until count messages is fetched. If None,
|
|
||||||
it will block forever.
|
|
||||||
"""
|
|
||||||
messages = []
|
|
||||||
|
|
||||||
# Give a size hint to the consumers. Each consumer process will fetch
|
|
||||||
# a maximum of "count" messages. This will fetch more messages than
|
|
||||||
# necessary, but these will not be committed to kafka. Also, the extra
|
|
||||||
# messages can be provided in subsequent runs
|
|
||||||
self.size.value = count
|
|
||||||
self.pause.clear()
|
|
||||||
|
|
||||||
if timeout is not None:
|
|
||||||
max_time = time.time() + timeout
|
|
||||||
|
|
||||||
new_offsets = {}
|
|
||||||
while count > 0 and (timeout is None or timeout > 0):
|
|
||||||
# Trigger consumption only if the queue is empty
|
|
||||||
# By doing this, we will ensure that consumers do not
|
|
||||||
# go into overdrive and keep consuming thousands of
|
|
||||||
# messages when the user might need only a few
|
|
||||||
if self.queue.empty():
|
|
||||||
self.start.set()
|
|
||||||
|
|
||||||
try:
|
|
||||||
partition, message = self.queue.get(block, timeout)
|
|
||||||
except Empty:
|
|
||||||
break
|
|
||||||
|
|
||||||
messages.append(message)
|
|
||||||
new_offsets[partition] = message.offset + 1
|
|
||||||
count -= 1
|
|
||||||
if timeout is not None:
|
|
||||||
timeout = max_time - time.time()
|
|
||||||
|
|
||||||
self.size.value = 0
|
|
||||||
self.start.clear()
|
|
||||||
self.pause.set()
|
|
||||||
|
|
||||||
# Update and commit offsets if necessary
|
|
||||||
self.offsets.update(new_offsets)
|
|
||||||
self.count_since_commit += len(messages)
|
|
||||||
self._auto_commit()
|
|
||||||
|
|
||||||
return messages
|
|
||||||
6
kafka/consumer/__init__.py
Normal file
6
kafka/consumer/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
from .simple import SimpleConsumer
|
||||||
|
from .multiprocess import MultiProcessConsumer
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'SimpleConsumer', 'MultiProcessConsumer'
|
||||||
|
]
|
||||||
169
kafka/consumer/base.py
Normal file
169
kafka/consumer/base.py
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import numbers
|
||||||
|
from threading import Lock
|
||||||
|
|
||||||
|
import kafka.common
|
||||||
|
from kafka.common import (
|
||||||
|
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
|
||||||
|
UnknownTopicOrPartitionError
|
||||||
|
)
|
||||||
|
|
||||||
|
from kafka.util import ReentrantTimer
|
||||||
|
|
||||||
|
log = logging.getLogger("kafka")
|
||||||
|
|
||||||
|
AUTO_COMMIT_MSG_COUNT = 100
|
||||||
|
AUTO_COMMIT_INTERVAL = 5000
|
||||||
|
|
||||||
|
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
|
||||||
|
FETCH_MAX_WAIT_TIME = 100
|
||||||
|
FETCH_MIN_BYTES = 4096
|
||||||
|
FETCH_BUFFER_SIZE_BYTES = 4096
|
||||||
|
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
|
||||||
|
|
||||||
|
ITER_TIMEOUT_SECONDS = 60
|
||||||
|
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
|
||||||
|
|
||||||
|
|
||||||
|
class Consumer(object):
|
||||||
|
"""
|
||||||
|
Base class to be used by other consumers. Not to be used directly
|
||||||
|
|
||||||
|
This base class provides logic for
|
||||||
|
* initialization and fetching metadata of partitions
|
||||||
|
* Auto-commit logic
|
||||||
|
* APIs for fetching pending message count
|
||||||
|
"""
|
||||||
|
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
|
||||||
|
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||||
|
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
|
||||||
|
|
||||||
|
self.client = client
|
||||||
|
self.topic = topic
|
||||||
|
self.group = group
|
||||||
|
self.client.load_metadata_for_topics(topic)
|
||||||
|
self.offsets = {}
|
||||||
|
|
||||||
|
if not partitions:
|
||||||
|
partitions = self.client.get_partition_ids_for_topic(topic)
|
||||||
|
else:
|
||||||
|
assert all(isinstance(x, numbers.Integral) for x in partitions)
|
||||||
|
|
||||||
|
# Variables for handling offset commits
|
||||||
|
self.commit_lock = Lock()
|
||||||
|
self.commit_timer = None
|
||||||
|
self.count_since_commit = 0
|
||||||
|
self.auto_commit = auto_commit
|
||||||
|
self.auto_commit_every_n = auto_commit_every_n
|
||||||
|
self.auto_commit_every_t = auto_commit_every_t
|
||||||
|
|
||||||
|
# Set up the auto-commit timer
|
||||||
|
if auto_commit is True and auto_commit_every_t is not None:
|
||||||
|
self.commit_timer = ReentrantTimer(auto_commit_every_t,
|
||||||
|
self.commit)
|
||||||
|
self.commit_timer.start()
|
||||||
|
|
||||||
|
if auto_commit:
|
||||||
|
self.fetch_last_known_offsets(partitions)
|
||||||
|
else:
|
||||||
|
for partition in partitions:
|
||||||
|
self.offsets[partition] = 0
|
||||||
|
|
||||||
|
def fetch_last_known_offsets(self, partitions=None):
|
||||||
|
if not partitions:
|
||||||
|
partitions = self.client.get_partition_ids_for_topic(self.topic)
|
||||||
|
|
||||||
|
def get_or_init_offset(resp):
|
||||||
|
try:
|
||||||
|
kafka.common.check_error(resp)
|
||||||
|
return resp.offset
|
||||||
|
except UnknownTopicOrPartitionError:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
for partition in partitions:
|
||||||
|
req = OffsetFetchRequest(self.topic, partition)
|
||||||
|
(resp,) = self.client.send_offset_fetch_request(self.group, [req],
|
||||||
|
fail_on_error=False)
|
||||||
|
self.offsets[partition] = get_or_init_offset(resp)
|
||||||
|
self.fetch_offsets = self.offsets.copy()
|
||||||
|
|
||||||
|
def commit(self, partitions=None):
|
||||||
|
"""
|
||||||
|
Commit offsets for this consumer
|
||||||
|
|
||||||
|
partitions: list of partitions to commit, default is to commit
|
||||||
|
all of them
|
||||||
|
"""
|
||||||
|
|
||||||
|
# short circuit if nothing happened. This check is kept outside
|
||||||
|
# to prevent un-necessarily acquiring a lock for checking the state
|
||||||
|
if self.count_since_commit == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
with self.commit_lock:
|
||||||
|
# Do this check again, just in case the state has changed
|
||||||
|
# during the lock acquiring timeout
|
||||||
|
if self.count_since_commit == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
reqs = []
|
||||||
|
if not partitions: # commit all partitions
|
||||||
|
partitions = self.offsets.keys()
|
||||||
|
|
||||||
|
for partition in partitions:
|
||||||
|
offset = self.offsets[partition]
|
||||||
|
log.debug("Commit offset %d in SimpleConsumer: "
|
||||||
|
"group=%s, topic=%s, partition=%s" %
|
||||||
|
(offset, self.group, self.topic, partition))
|
||||||
|
|
||||||
|
reqs.append(OffsetCommitRequest(self.topic, partition,
|
||||||
|
offset, None))
|
||||||
|
|
||||||
|
resps = self.client.send_offset_commit_request(self.group, reqs)
|
||||||
|
for resp in resps:
|
||||||
|
kafka.common.check_error(resp)
|
||||||
|
|
||||||
|
self.count_since_commit = 0
|
||||||
|
|
||||||
|
def _auto_commit(self):
|
||||||
|
"""
|
||||||
|
Check if we have to commit based on number of messages and commit
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Check if we are supposed to do an auto-commit
|
||||||
|
if not self.auto_commit or self.auto_commit_every_n is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.count_since_commit >= self.auto_commit_every_n:
|
||||||
|
self.commit()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self.commit_timer is not None:
|
||||||
|
self.commit_timer.stop()
|
||||||
|
self.commit()
|
||||||
|
|
||||||
|
def pending(self, partitions=None):
|
||||||
|
"""
|
||||||
|
Gets the pending message count
|
||||||
|
|
||||||
|
partitions: list of partitions to check for, default is to check all
|
||||||
|
"""
|
||||||
|
if not partitions:
|
||||||
|
partitions = self.offsets.keys()
|
||||||
|
|
||||||
|
total = 0
|
||||||
|
reqs = []
|
||||||
|
|
||||||
|
for partition in partitions:
|
||||||
|
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
||||||
|
|
||||||
|
resps = self.client.send_offset_request(reqs)
|
||||||
|
for resp in resps:
|
||||||
|
partition = resp.partition
|
||||||
|
pending = resp.offsets[0]
|
||||||
|
offset = self.offsets[partition]
|
||||||
|
total += pending - offset - (1 if offset > 0 else 0)
|
||||||
|
|
||||||
|
return total
|
||||||
248
kafka/consumer/multiprocess.py
Normal file
248
kafka/consumer/multiprocess.py
Normal file
@@ -0,0 +1,248 @@
|
|||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from multiprocessing import Process, Queue as MPQueue, Event, Value
|
||||||
|
|
||||||
|
try:
|
||||||
|
from Queue import Empty
|
||||||
|
except ImportError: # python 2
|
||||||
|
from queue import Empty
|
||||||
|
|
||||||
|
from .base import (
|
||||||
|
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
|
||||||
|
NO_MESSAGES_WAIT_TIME_SECONDS
|
||||||
|
)
|
||||||
|
from .simple import Consumer, SimpleConsumer
|
||||||
|
|
||||||
|
log = logging.getLogger("kafka")
|
||||||
|
|
||||||
|
|
||||||
|
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
|
||||||
|
"""
|
||||||
|
A child process worker which consumes messages based on the
|
||||||
|
notifications given by the controller process
|
||||||
|
|
||||||
|
NOTE: Ideally, this should have been a method inside the Consumer
|
||||||
|
class. However, multiprocessing module has issues in windows. The
|
||||||
|
functionality breaks unless this function is kept outside of a class
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Make the child processes open separate socket connections
|
||||||
|
client.reinit()
|
||||||
|
|
||||||
|
# We will start consumers without auto-commit. Auto-commit will be
|
||||||
|
# done by the master controller process.
|
||||||
|
consumer = SimpleConsumer(client, group, topic,
|
||||||
|
partitions=chunk,
|
||||||
|
auto_commit=False,
|
||||||
|
auto_commit_every_n=None,
|
||||||
|
auto_commit_every_t=None)
|
||||||
|
|
||||||
|
# Ensure that the consumer provides the partition information
|
||||||
|
consumer.provide_partition_info()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Wait till the controller indicates us to start consumption
|
||||||
|
start.wait()
|
||||||
|
|
||||||
|
# If we are asked to quit, do so
|
||||||
|
if exit.is_set():
|
||||||
|
break
|
||||||
|
|
||||||
|
# Consume messages and add them to the queue. If the controller
|
||||||
|
# indicates a specific number of messages, follow that advice
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
message = consumer.get_message()
|
||||||
|
if message:
|
||||||
|
queue.put(message)
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
# We have reached the required size. The controller might have
|
||||||
|
# more than what he needs. Wait for a while.
|
||||||
|
# Without this logic, it is possible that we run into a big
|
||||||
|
# loop consuming all available messages before the controller
|
||||||
|
# can reset the 'start' event
|
||||||
|
if count == size.value:
|
||||||
|
pause.wait()
|
||||||
|
|
||||||
|
else:
|
||||||
|
# In case we did not receive any message, give up the CPU for
|
||||||
|
# a while before we try again
|
||||||
|
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
|
||||||
|
|
||||||
|
consumer.stop()
|
||||||
|
|
||||||
|
|
||||||
|
class MultiProcessConsumer(Consumer):
|
||||||
|
"""
|
||||||
|
A consumer implementation that consumes partitions for a topic in
|
||||||
|
parallel using multiple processes
|
||||||
|
|
||||||
|
client: a connected KafkaClient
|
||||||
|
group: a name for this consumer, used for offset storage and must be unique
|
||||||
|
topic: the topic to consume
|
||||||
|
|
||||||
|
auto_commit: default True. Whether or not to auto commit the offsets
|
||||||
|
auto_commit_every_n: default 100. How many messages to consume
|
||||||
|
before a commit
|
||||||
|
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
||||||
|
wait before commit
|
||||||
|
num_procs: Number of processes to start for consuming messages.
|
||||||
|
The available partitions will be divided among these processes
|
||||||
|
partitions_per_proc: Number of partitions to be allocated per process
|
||||||
|
(overrides num_procs)
|
||||||
|
|
||||||
|
Auto commit details:
|
||||||
|
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
||||||
|
reset one another when one is triggered. These triggers simply call the
|
||||||
|
commit method on this class. A manual call to commit will also reset
|
||||||
|
these triggers
|
||||||
|
"""
|
||||||
|
def __init__(self, client, group, topic, auto_commit=True,
|
||||||
|
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||||
|
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
||||||
|
num_procs=1, partitions_per_proc=0):
|
||||||
|
|
||||||
|
# Initiate the base consumer class
|
||||||
|
super(MultiProcessConsumer, self).__init__(
|
||||||
|
client, group, topic,
|
||||||
|
partitions=None,
|
||||||
|
auto_commit=auto_commit,
|
||||||
|
auto_commit_every_n=auto_commit_every_n,
|
||||||
|
auto_commit_every_t=auto_commit_every_t)
|
||||||
|
|
||||||
|
# Variables for managing and controlling the data flow from
|
||||||
|
# consumer child process to master
|
||||||
|
self.queue = MPQueue(1024) # Child consumers dump messages into this
|
||||||
|
self.start = Event() # Indicates the consumers to start fetch
|
||||||
|
self.exit = Event() # Requests the consumers to shutdown
|
||||||
|
self.pause = Event() # Requests the consumers to pause fetch
|
||||||
|
self.size = Value('i', 0) # Indicator of number of messages to fetch
|
||||||
|
|
||||||
|
partitions = self.offsets.keys()
|
||||||
|
|
||||||
|
# If unspecified, start one consumer per partition
|
||||||
|
# The logic below ensures that
|
||||||
|
# * we do not cross the num_procs limit
|
||||||
|
# * we have an even distribution of partitions among processes
|
||||||
|
if not partitions_per_proc:
|
||||||
|
partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
|
||||||
|
if partitions_per_proc < num_procs * 0.5:
|
||||||
|
partitions_per_proc += 1
|
||||||
|
|
||||||
|
# The final set of chunks
|
||||||
|
chunker = lambda *x: [] + list(x)
|
||||||
|
chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
|
||||||
|
|
||||||
|
self.procs = []
|
||||||
|
for chunk in chunks:
|
||||||
|
chunk = filter(lambda x: x is not None, chunk)
|
||||||
|
args = (client.copy(),
|
||||||
|
group, topic, list(chunk),
|
||||||
|
self.queue, self.start, self.exit,
|
||||||
|
self.pause, self.size)
|
||||||
|
|
||||||
|
proc = Process(target=_mp_consume, args=args)
|
||||||
|
proc.daemon = True
|
||||||
|
proc.start()
|
||||||
|
self.procs.append(proc)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
|
||||||
|
(self.group, self.topic, len(self.procs))
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
# Set exit and start off all waiting consumers
|
||||||
|
self.exit.set()
|
||||||
|
self.pause.set()
|
||||||
|
self.start.set()
|
||||||
|
|
||||||
|
for proc in self.procs:
|
||||||
|
proc.join()
|
||||||
|
proc.terminate()
|
||||||
|
|
||||||
|
super(MultiProcessConsumer, self).stop()
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
"""
|
||||||
|
Iterator to consume the messages available on this consumer
|
||||||
|
"""
|
||||||
|
# Trigger the consumer procs to start off.
|
||||||
|
# We will iterate till there are no more messages available
|
||||||
|
self.size.value = 0
|
||||||
|
self.pause.set()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
self.start.set()
|
||||||
|
try:
|
||||||
|
# We will block for a small while so that the consumers get
|
||||||
|
# a chance to run and put some messages in the queue
|
||||||
|
# TODO: This is a hack and will make the consumer block for
|
||||||
|
# at least one second. Need to find a better way of doing this
|
||||||
|
partition, message = self.queue.get(block=True, timeout=1)
|
||||||
|
except Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Count, check and commit messages if necessary
|
||||||
|
self.offsets[partition] = message.offset + 1
|
||||||
|
self.start.clear()
|
||||||
|
self.count_since_commit += 1
|
||||||
|
self._auto_commit()
|
||||||
|
yield message
|
||||||
|
|
||||||
|
self.start.clear()
|
||||||
|
|
||||||
|
def get_messages(self, count=1, block=True, timeout=10):
|
||||||
|
"""
|
||||||
|
Fetch the specified number of messages
|
||||||
|
|
||||||
|
count: Indicates the maximum number of messages to be fetched
|
||||||
|
block: If True, the API will block till some messages are fetched.
|
||||||
|
timeout: If block is True, the function will block for the specified
|
||||||
|
time (in seconds) until count messages is fetched. If None,
|
||||||
|
it will block forever.
|
||||||
|
"""
|
||||||
|
messages = []
|
||||||
|
|
||||||
|
# Give a size hint to the consumers. Each consumer process will fetch
|
||||||
|
# a maximum of "count" messages. This will fetch more messages than
|
||||||
|
# necessary, but these will not be committed to kafka. Also, the extra
|
||||||
|
# messages can be provided in subsequent runs
|
||||||
|
self.size.value = count
|
||||||
|
self.pause.clear()
|
||||||
|
|
||||||
|
if timeout is not None:
|
||||||
|
max_time = time.time() + timeout
|
||||||
|
|
||||||
|
new_offsets = {}
|
||||||
|
while count > 0 and (timeout is None or timeout > 0):
|
||||||
|
# Trigger consumption only if the queue is empty
|
||||||
|
# By doing this, we will ensure that consumers do not
|
||||||
|
# go into overdrive and keep consuming thousands of
|
||||||
|
# messages when the user might need only a few
|
||||||
|
if self.queue.empty():
|
||||||
|
self.start.set()
|
||||||
|
|
||||||
|
try:
|
||||||
|
partition, message = self.queue.get(block, timeout)
|
||||||
|
except Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
messages.append(message)
|
||||||
|
new_offsets[partition] = message.offset + 1
|
||||||
|
count -= 1
|
||||||
|
if timeout is not None:
|
||||||
|
timeout = max_time - time.time()
|
||||||
|
|
||||||
|
self.size.value = 0
|
||||||
|
self.start.clear()
|
||||||
|
self.pause.set()
|
||||||
|
|
||||||
|
# Update and commit offsets if necessary
|
||||||
|
self.offsets.update(new_offsets)
|
||||||
|
self.count_since_commit += len(messages)
|
||||||
|
self._auto_commit()
|
||||||
|
|
||||||
|
return messages
|
||||||
318
kafka/consumer/simple.py
Normal file
318
kafka/consumer/simple.py
Normal file
@@ -0,0 +1,318 @@
|
|||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
try:
|
||||||
|
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
|
||||||
|
except ImportError: # python 2
|
||||||
|
from itertools import izip_longest as izip_longest, repeat
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
|
try:
|
||||||
|
from Queue import Empty, Queue
|
||||||
|
except ImportError: # python 2
|
||||||
|
from queue import Empty, Queue
|
||||||
|
|
||||||
|
from kafka.common import (
|
||||||
|
FetchRequest, OffsetRequest,
|
||||||
|
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
|
||||||
|
)
|
||||||
|
from .base import (
|
||||||
|
Consumer,
|
||||||
|
FETCH_DEFAULT_BLOCK_TIMEOUT,
|
||||||
|
AUTO_COMMIT_MSG_COUNT,
|
||||||
|
AUTO_COMMIT_INTERVAL,
|
||||||
|
FETCH_MIN_BYTES,
|
||||||
|
FETCH_BUFFER_SIZE_BYTES,
|
||||||
|
MAX_FETCH_BUFFER_SIZE_BYTES,
|
||||||
|
FETCH_MAX_WAIT_TIME,
|
||||||
|
ITER_TIMEOUT_SECONDS,
|
||||||
|
NO_MESSAGES_WAIT_TIME_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
log = logging.getLogger("kafka")
|
||||||
|
|
||||||
|
class FetchContext(object):
|
||||||
|
"""
|
||||||
|
Class for managing the state of a consumer during fetch
|
||||||
|
"""
|
||||||
|
def __init__(self, consumer, block, timeout):
|
||||||
|
self.consumer = consumer
|
||||||
|
self.block = block
|
||||||
|
|
||||||
|
if block:
|
||||||
|
if not timeout:
|
||||||
|
timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
|
||||||
|
self.timeout = timeout * 1000
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
"""Set fetch values based on blocking status"""
|
||||||
|
self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
|
||||||
|
self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
|
||||||
|
if self.block:
|
||||||
|
self.consumer.fetch_max_wait_time = self.timeout
|
||||||
|
self.consumer.fetch_min_bytes = 1
|
||||||
|
else:
|
||||||
|
self.consumer.fetch_min_bytes = 0
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
"""Reset values"""
|
||||||
|
self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
|
||||||
|
self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleConsumer(Consumer):
|
||||||
|
"""
|
||||||
|
A simple consumer implementation that consumes all/specified partitions
|
||||||
|
for a topic
|
||||||
|
|
||||||
|
client: a connected KafkaClient
|
||||||
|
group: a name for this consumer, used for offset storage and must be unique
|
||||||
|
topic: the topic to consume
|
||||||
|
partitions: An optional list of partitions to consume the data from
|
||||||
|
|
||||||
|
auto_commit: default True. Whether or not to auto commit the offsets
|
||||||
|
auto_commit_every_n: default 100. How many messages to consume
|
||||||
|
before a commit
|
||||||
|
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
||||||
|
wait before commit
|
||||||
|
fetch_size_bytes: number of bytes to request in a FetchRequest
|
||||||
|
buffer_size: default 4K. Initial number of bytes to tell kafka we
|
||||||
|
have available. This will double as needed.
|
||||||
|
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
|
||||||
|
available. None means no limit.
|
||||||
|
iter_timeout: default None. How much time (in seconds) to wait for a
|
||||||
|
message in the iterator before exiting. None means no
|
||||||
|
timeout, so it will wait forever.
|
||||||
|
|
||||||
|
Auto commit details:
|
||||||
|
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
||||||
|
reset one another when one is triggered. These triggers simply call the
|
||||||
|
commit method on this class. A manual call to commit will also reset
|
||||||
|
these triggers
|
||||||
|
"""
|
||||||
|
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
|
||||||
|
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||||
|
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
||||||
|
fetch_size_bytes=FETCH_MIN_BYTES,
|
||||||
|
buffer_size=FETCH_BUFFER_SIZE_BYTES,
|
||||||
|
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
|
||||||
|
iter_timeout=None):
|
||||||
|
super(SimpleConsumer, self).__init__(
|
||||||
|
client, group, topic,
|
||||||
|
partitions=partitions,
|
||||||
|
auto_commit=auto_commit,
|
||||||
|
auto_commit_every_n=auto_commit_every_n,
|
||||||
|
auto_commit_every_t=auto_commit_every_t)
|
||||||
|
|
||||||
|
if max_buffer_size is not None and buffer_size > max_buffer_size:
|
||||||
|
raise ValueError("buffer_size (%d) is greater than "
|
||||||
|
"max_buffer_size (%d)" %
|
||||||
|
(buffer_size, max_buffer_size))
|
||||||
|
self.buffer_size = buffer_size
|
||||||
|
self.max_buffer_size = max_buffer_size
|
||||||
|
self.partition_info = False # Do not return partition info in msgs
|
||||||
|
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
|
||||||
|
self.fetch_min_bytes = fetch_size_bytes
|
||||||
|
self.fetch_offsets = self.offsets.copy()
|
||||||
|
self.iter_timeout = iter_timeout
|
||||||
|
self.queue = Queue()
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
|
||||||
|
(self.group, self.topic, str(self.offsets.keys()))
|
||||||
|
|
||||||
|
def provide_partition_info(self):
|
||||||
|
"""
|
||||||
|
Indicates that partition info must be returned by the consumer
|
||||||
|
"""
|
||||||
|
self.partition_info = True
|
||||||
|
|
||||||
|
def seek(self, offset, whence):
|
||||||
|
"""
|
||||||
|
Alter the current offset in the consumer, similar to fseek
|
||||||
|
|
||||||
|
offset: how much to modify the offset
|
||||||
|
whence: where to modify it from
|
||||||
|
0 is relative to the earliest available offset (head)
|
||||||
|
1 is relative to the current offset
|
||||||
|
2 is relative to the latest known offset (tail)
|
||||||
|
"""
|
||||||
|
|
||||||
|
if whence == 1: # relative to current position
|
||||||
|
for partition, _offset in self.offsets.items():
|
||||||
|
self.offsets[partition] = _offset + offset
|
||||||
|
elif whence in (0, 2): # relative to beginning or end
|
||||||
|
# divide the request offset by number of partitions,
|
||||||
|
# distribute the remained evenly
|
||||||
|
(delta, rem) = divmod(offset, len(self.offsets))
|
||||||
|
deltas = {}
|
||||||
|
for partition, r in izip_longest(self.offsets.keys(),
|
||||||
|
repeat(1, rem), fillvalue=0):
|
||||||
|
deltas[partition] = delta + r
|
||||||
|
|
||||||
|
reqs = []
|
||||||
|
for partition in self.offsets.keys():
|
||||||
|
if whence == 0:
|
||||||
|
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
|
||||||
|
elif whence == 2:
|
||||||
|
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
resps = self.client.send_offset_request(reqs)
|
||||||
|
for resp in resps:
|
||||||
|
self.offsets[resp.partition] = \
|
||||||
|
resp.offsets[0] + deltas[resp.partition]
|
||||||
|
else:
|
||||||
|
raise ValueError("Unexpected value for `whence`, %d" % whence)
|
||||||
|
|
||||||
|
# Reset queue and fetch offsets since they are invalid
|
||||||
|
self.fetch_offsets = self.offsets.copy()
|
||||||
|
if self.auto_commit:
|
||||||
|
self.count_since_commit += 1
|
||||||
|
self.commit()
|
||||||
|
|
||||||
|
self.queue = Queue()
|
||||||
|
|
||||||
|
def get_messages(self, count=1, block=True, timeout=0.1):
|
||||||
|
"""
|
||||||
|
Fetch the specified number of messages
|
||||||
|
|
||||||
|
count: Indicates the maximum number of messages to be fetched
|
||||||
|
block: If True, the API will block till some messages are fetched.
|
||||||
|
timeout: If block is True, the function will block for the specified
|
||||||
|
time (in seconds) until count messages is fetched. If None,
|
||||||
|
it will block forever.
|
||||||
|
"""
|
||||||
|
messages = []
|
||||||
|
if timeout is not None:
|
||||||
|
max_time = time.time() + timeout
|
||||||
|
|
||||||
|
new_offsets = {}
|
||||||
|
while count > 0 and (timeout is None or timeout > 0):
|
||||||
|
result = self._get_message(block, timeout, get_partition_info=True,
|
||||||
|
update_offset=False)
|
||||||
|
if result:
|
||||||
|
partition, message = result
|
||||||
|
if self.partition_info:
|
||||||
|
messages.append(result)
|
||||||
|
else:
|
||||||
|
messages.append(message)
|
||||||
|
new_offsets[partition] = message.offset + 1
|
||||||
|
count -= 1
|
||||||
|
else:
|
||||||
|
# Ran out of messages for the last request.
|
||||||
|
if not block:
|
||||||
|
# If we're not blocking, break.
|
||||||
|
break
|
||||||
|
if timeout is not None:
|
||||||
|
# If we're blocking and have a timeout, reduce it to the
|
||||||
|
# appropriate value
|
||||||
|
timeout = max_time - time.time()
|
||||||
|
|
||||||
|
# Update and commit offsets if necessary
|
||||||
|
self.offsets.update(new_offsets)
|
||||||
|
self.count_since_commit += len(messages)
|
||||||
|
self._auto_commit()
|
||||||
|
return messages
|
||||||
|
|
||||||
|
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
|
||||||
|
return self._get_message(block, timeout, get_partition_info)
|
||||||
|
|
||||||
|
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
|
||||||
|
update_offset=True):
|
||||||
|
"""
|
||||||
|
If no messages can be fetched, returns None.
|
||||||
|
If get_partition_info is None, it defaults to self.partition_info
|
||||||
|
If get_partition_info is True, returns (partition, message)
|
||||||
|
If get_partition_info is False, returns message
|
||||||
|
"""
|
||||||
|
if self.queue.empty():
|
||||||
|
# We're out of messages, go grab some more.
|
||||||
|
with FetchContext(self, block, timeout):
|
||||||
|
self._fetch()
|
||||||
|
try:
|
||||||
|
partition, message = self.queue.get_nowait()
|
||||||
|
|
||||||
|
if update_offset:
|
||||||
|
# Update partition offset
|
||||||
|
self.offsets[partition] = message.offset + 1
|
||||||
|
|
||||||
|
# Count, check and commit messages if necessary
|
||||||
|
self.count_since_commit += 1
|
||||||
|
self._auto_commit()
|
||||||
|
|
||||||
|
if get_partition_info is None:
|
||||||
|
get_partition_info = self.partition_info
|
||||||
|
if get_partition_info:
|
||||||
|
return partition, message
|
||||||
|
else:
|
||||||
|
return message
|
||||||
|
except Empty:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
if self.iter_timeout is None:
|
||||||
|
timeout = ITER_TIMEOUT_SECONDS
|
||||||
|
else:
|
||||||
|
timeout = self.iter_timeout
|
||||||
|
|
||||||
|
while True:
|
||||||
|
message = self.get_message(True, timeout)
|
||||||
|
if message:
|
||||||
|
yield message
|
||||||
|
elif self.iter_timeout is None:
|
||||||
|
# We did not receive any message yet but we don't have a
|
||||||
|
# timeout, so give up the CPU for a while before trying again
|
||||||
|
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
|
||||||
|
else:
|
||||||
|
# Timed out waiting for a message
|
||||||
|
break
|
||||||
|
|
||||||
|
def _fetch(self):
|
||||||
|
# Create fetch request payloads for all the partitions
|
||||||
|
partitions = dict((p, self.buffer_size)
|
||||||
|
for p in self.fetch_offsets.keys())
|
||||||
|
while partitions:
|
||||||
|
requests = []
|
||||||
|
for partition, buffer_size in six.iteritems(partitions):
|
||||||
|
requests.append(FetchRequest(self.topic, partition,
|
||||||
|
self.fetch_offsets[partition],
|
||||||
|
buffer_size))
|
||||||
|
# Send request
|
||||||
|
responses = self.client.send_fetch_request(
|
||||||
|
requests,
|
||||||
|
max_wait_time=int(self.fetch_max_wait_time),
|
||||||
|
min_bytes=self.fetch_min_bytes)
|
||||||
|
|
||||||
|
retry_partitions = {}
|
||||||
|
for resp in responses:
|
||||||
|
partition = resp.partition
|
||||||
|
buffer_size = partitions[partition]
|
||||||
|
try:
|
||||||
|
for message in resp.messages:
|
||||||
|
# Put the message in our queue
|
||||||
|
self.queue.put((partition, message))
|
||||||
|
self.fetch_offsets[partition] = message.offset + 1
|
||||||
|
except ConsumerFetchSizeTooSmall:
|
||||||
|
if (self.max_buffer_size is not None and
|
||||||
|
buffer_size == self.max_buffer_size):
|
||||||
|
log.error("Max fetch size %d too small",
|
||||||
|
self.max_buffer_size)
|
||||||
|
raise
|
||||||
|
if self.max_buffer_size is None:
|
||||||
|
buffer_size *= 2
|
||||||
|
else:
|
||||||
|
buffer_size = max(buffer_size * 2,
|
||||||
|
self.max_buffer_size)
|
||||||
|
log.warn("Fetch size too small, increase to %d (2x) "
|
||||||
|
"and retry", buffer_size)
|
||||||
|
retry_partitions[partition] = buffer_size
|
||||||
|
except ConsumerNoMoreData as e:
|
||||||
|
log.debug("Iteration was ended by %r", e)
|
||||||
|
except StopIteration:
|
||||||
|
# Stop iterating through this partition
|
||||||
|
log.debug("Done iterating over partition %s" % partition)
|
||||||
|
partitions = retry_partitions
|
||||||
@@ -1,58 +0,0 @@
|
|||||||
from itertools import cycle
|
|
||||||
|
|
||||||
|
|
||||||
class Partitioner(object):
|
|
||||||
"""
|
|
||||||
Base class for a partitioner
|
|
||||||
"""
|
|
||||||
def __init__(self, partitions):
|
|
||||||
"""
|
|
||||||
Initialize the partitioner
|
|
||||||
|
|
||||||
partitions - A list of available partitions (during startup)
|
|
||||||
"""
|
|
||||||
self.partitions = partitions
|
|
||||||
|
|
||||||
def partition(self, key, partitions):
|
|
||||||
"""
|
|
||||||
Takes a string key and num_partitions as argument and returns
|
|
||||||
a partition to be used for the message
|
|
||||||
|
|
||||||
partitions - The list of partitions is passed in every call. This
|
|
||||||
may look like an overhead, but it will be useful
|
|
||||||
(in future) when we handle cases like rebalancing
|
|
||||||
"""
|
|
||||||
raise NotImplementedError('partition function has to be implemented')
|
|
||||||
|
|
||||||
|
|
||||||
class RoundRobinPartitioner(Partitioner):
|
|
||||||
"""
|
|
||||||
Implements a round robin partitioner which sends data to partitions
|
|
||||||
in a round robin fashion
|
|
||||||
"""
|
|
||||||
def __init__(self, partitions):
|
|
||||||
super(RoundRobinPartitioner, self).__init__(partitions)
|
|
||||||
self.iterpart = cycle(partitions)
|
|
||||||
|
|
||||||
def _set_partitions(self, partitions):
|
|
||||||
self.partitions = partitions
|
|
||||||
self.iterpart = cycle(partitions)
|
|
||||||
|
|
||||||
def partition(self, key, partitions):
|
|
||||||
# Refresh the partition list if necessary
|
|
||||||
if self.partitions != partitions:
|
|
||||||
self._set_partitions(partitions)
|
|
||||||
|
|
||||||
return next(self.iterpart)
|
|
||||||
|
|
||||||
|
|
||||||
class HashedPartitioner(Partitioner):
|
|
||||||
"""
|
|
||||||
Implements a partitioner which selects the target partition based on
|
|
||||||
the hash of the key
|
|
||||||
"""
|
|
||||||
def partition(self, key, partitions):
|
|
||||||
size = len(partitions)
|
|
||||||
idx = hash(key) % size
|
|
||||||
|
|
||||||
return partitions[idx]
|
|
||||||
6
kafka/partitioner/__init__.py
Normal file
6
kafka/partitioner/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
from .roundrobin import RoundRobinPartitioner
|
||||||
|
from .hashed import HashedPartitioner
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'RoundRobinPartitioner', 'HashedPartitioner'
|
||||||
|
]
|
||||||
23
kafka/partitioner/base.py
Normal file
23
kafka/partitioner/base.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
|
||||||
|
class Partitioner(object):
|
||||||
|
"""
|
||||||
|
Base class for a partitioner
|
||||||
|
"""
|
||||||
|
def __init__(self, partitions):
|
||||||
|
"""
|
||||||
|
Initialize the partitioner
|
||||||
|
|
||||||
|
partitions - A list of available partitions (during startup)
|
||||||
|
"""
|
||||||
|
self.partitions = partitions
|
||||||
|
|
||||||
|
def partition(self, key, partitions):
|
||||||
|
"""
|
||||||
|
Takes a string key and num_partitions as argument and returns
|
||||||
|
a partition to be used for the message
|
||||||
|
|
||||||
|
partitions - The list of partitions is passed in every call. This
|
||||||
|
may look like an overhead, but it will be useful
|
||||||
|
(in future) when we handle cases like rebalancing
|
||||||
|
"""
|
||||||
|
raise NotImplementedError('partition function has to be implemented')
|
||||||
12
kafka/partitioner/hashed.py
Normal file
12
kafka/partitioner/hashed.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
from .base import Partitioner
|
||||||
|
|
||||||
|
class HashedPartitioner(Partitioner):
|
||||||
|
"""
|
||||||
|
Implements a partitioner which selects the target partition based on
|
||||||
|
the hash of the key
|
||||||
|
"""
|
||||||
|
def partition(self, key, partitions):
|
||||||
|
size = len(partitions)
|
||||||
|
idx = hash(key) % size
|
||||||
|
|
||||||
|
return partitions[idx]
|
||||||
23
kafka/partitioner/roundrobin.py
Normal file
23
kafka/partitioner/roundrobin.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
from itertools import cycle
|
||||||
|
|
||||||
|
from .base import Partitioner
|
||||||
|
|
||||||
|
class RoundRobinPartitioner(Partitioner):
|
||||||
|
"""
|
||||||
|
Implements a round robin partitioner which sends data to partitions
|
||||||
|
in a round robin fashion
|
||||||
|
"""
|
||||||
|
def __init__(self, partitions):
|
||||||
|
super(RoundRobinPartitioner, self).__init__(partitions)
|
||||||
|
self.iterpart = cycle(partitions)
|
||||||
|
|
||||||
|
def _set_partitions(self, partitions):
|
||||||
|
self.partitions = partitions
|
||||||
|
self.iterpart = cycle(partitions)
|
||||||
|
|
||||||
|
def partition(self, key, partitions):
|
||||||
|
# Refresh the partition list if necessary
|
||||||
|
if self.partitions != partitions:
|
||||||
|
self._set_partitions(partitions)
|
||||||
|
|
||||||
|
return next(self.iterpart)
|
||||||
6
kafka/producer/__init__.py
Normal file
6
kafka/producer/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
from .simple import SimpleProducer
|
||||||
|
from .keyed import KeyedProducer
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'SimpleProducer', 'KeyedProducer'
|
||||||
|
]
|
||||||
@@ -2,23 +2,19 @@ from __future__ import absolute_import
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import random
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from queue import Empty
|
from queue import Empty
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from Queue import Empty
|
from Queue import Empty
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from itertools import cycle
|
|
||||||
from multiprocessing import Queue, Process
|
from multiprocessing import Queue, Process
|
||||||
|
|
||||||
import six
|
import six
|
||||||
from six.moves import xrange
|
|
||||||
|
|
||||||
from kafka.common import (
|
from kafka.common import (
|
||||||
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
|
ProduceRequest, TopicAndPartition, UnsupportedCodecError
|
||||||
)
|
)
|
||||||
from kafka.partitioner import HashedPartitioner
|
|
||||||
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
|
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
|
||||||
|
|
||||||
log = logging.getLogger("kafka")
|
log = logging.getLogger("kafka")
|
||||||
@@ -208,112 +204,3 @@ class Producer(object):
|
|||||||
|
|
||||||
if self.proc.is_alive():
|
if self.proc.is_alive():
|
||||||
self.proc.terminate()
|
self.proc.terminate()
|
||||||
|
|
||||||
|
|
||||||
class SimpleProducer(Producer):
|
|
||||||
"""
|
|
||||||
A simple, round-robin producer. Each message goes to exactly one partition
|
|
||||||
|
|
||||||
Params:
|
|
||||||
client - The Kafka client instance to use
|
|
||||||
async - If True, the messages are sent asynchronously via another
|
|
||||||
thread (process). We will not wait for a response to these
|
|
||||||
req_acks - A value indicating the acknowledgements that the server must
|
|
||||||
receive before responding to the request
|
|
||||||
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
|
|
||||||
for an acknowledgement
|
|
||||||
batch_send - If True, messages are send in batches
|
|
||||||
batch_send_every_n - If set, messages are send in batches of this size
|
|
||||||
batch_send_every_t - If set, messages are send after this timeout
|
|
||||||
random_start - If true, randomize the initial partition which the
|
|
||||||
the first message block will be published to, otherwise
|
|
||||||
if false, the first message block will always publish
|
|
||||||
to partition 0 before cycling through each partition
|
|
||||||
"""
|
|
||||||
def __init__(self, client, async=False,
|
|
||||||
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
|
||||||
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
|
|
||||||
codec=None,
|
|
||||||
batch_send=False,
|
|
||||||
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
|
||||||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
|
||||||
random_start=False):
|
|
||||||
self.partition_cycles = {}
|
|
||||||
self.random_start = random_start
|
|
||||||
super(SimpleProducer, self).__init__(client, async, req_acks,
|
|
||||||
ack_timeout, codec, batch_send,
|
|
||||||
batch_send_every_n,
|
|
||||||
batch_send_every_t)
|
|
||||||
|
|
||||||
def _next_partition(self, topic):
|
|
||||||
if topic not in self.partition_cycles:
|
|
||||||
if not self.client.has_metadata_for_topic(topic):
|
|
||||||
self.client.load_metadata_for_topics(topic)
|
|
||||||
|
|
||||||
self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
|
|
||||||
|
|
||||||
# Randomize the initial partition that is returned
|
|
||||||
if self.random_start:
|
|
||||||
num_partitions = len(self.client.get_partition_ids_for_topic(topic))
|
|
||||||
for _ in xrange(random.randint(0, num_partitions-1)):
|
|
||||||
next(self.partition_cycles[topic])
|
|
||||||
|
|
||||||
return next(self.partition_cycles[topic])
|
|
||||||
|
|
||||||
def send_messages(self, topic, *msg):
|
|
||||||
partition = self._next_partition(topic)
|
|
||||||
return super(SimpleProducer, self).send_messages(topic, partition, *msg)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return '<SimpleProducer batch=%s>' % self.async
|
|
||||||
|
|
||||||
|
|
||||||
class KeyedProducer(Producer):
|
|
||||||
"""
|
|
||||||
A producer which distributes messages to partitions based on the key
|
|
||||||
|
|
||||||
Args:
|
|
||||||
client - The kafka client instance
|
|
||||||
partitioner - A partitioner class that will be used to get the partition
|
|
||||||
to send the message to. Must be derived from Partitioner
|
|
||||||
async - If True, the messages are sent asynchronously via another
|
|
||||||
thread (process). We will not wait for a response to these
|
|
||||||
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
|
|
||||||
for an acknowledgement
|
|
||||||
batch_send - If True, messages are send in batches
|
|
||||||
batch_send_every_n - If set, messages are send in batches of this size
|
|
||||||
batch_send_every_t - If set, messages are send after this timeout
|
|
||||||
"""
|
|
||||||
def __init__(self, client, partitioner=None, async=False,
|
|
||||||
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
|
||||||
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
|
|
||||||
codec=None,
|
|
||||||
batch_send=False,
|
|
||||||
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
|
||||||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
|
|
||||||
if not partitioner:
|
|
||||||
partitioner = HashedPartitioner
|
|
||||||
self.partitioner_class = partitioner
|
|
||||||
self.partitioners = {}
|
|
||||||
|
|
||||||
super(KeyedProducer, self).__init__(client, async, req_acks,
|
|
||||||
ack_timeout, codec, batch_send,
|
|
||||||
batch_send_every_n,
|
|
||||||
batch_send_every_t)
|
|
||||||
|
|
||||||
def _next_partition(self, topic, key):
|
|
||||||
if topic not in self.partitioners:
|
|
||||||
if not self.client.has_metadata_for_topic(topic):
|
|
||||||
self.client.load_metadata_for_topics(topic)
|
|
||||||
|
|
||||||
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
|
|
||||||
|
|
||||||
partitioner = self.partitioners[topic]
|
|
||||||
return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))
|
|
||||||
|
|
||||||
def send(self, topic, key, msg):
|
|
||||||
partition = self._next_partition(topic, key)
|
|
||||||
return self.send_messages(topic, partition, msg)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return '<KeyedProducer batch=%s>' % self.async
|
|
||||||
62
kafka/producer/keyed.py
Normal file
62
kafka/producer/keyed.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from kafka.partitioner import HashedPartitioner
|
||||||
|
from .base import (
|
||||||
|
Producer, BATCH_SEND_DEFAULT_INTERVAL,
|
||||||
|
BATCH_SEND_MSG_COUNT
|
||||||
|
)
|
||||||
|
|
||||||
|
log = logging.getLogger("kafka")
|
||||||
|
|
||||||
|
|
||||||
|
class KeyedProducer(Producer):
|
||||||
|
"""
|
||||||
|
A producer which distributes messages to partitions based on the key
|
||||||
|
|
||||||
|
Args:
|
||||||
|
client - The kafka client instance
|
||||||
|
partitioner - A partitioner class that will be used to get the partition
|
||||||
|
to send the message to. Must be derived from Partitioner
|
||||||
|
async - If True, the messages are sent asynchronously via another
|
||||||
|
thread (process). We will not wait for a response to these
|
||||||
|
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
|
||||||
|
for an acknowledgement
|
||||||
|
batch_send - If True, messages are send in batches
|
||||||
|
batch_send_every_n - If set, messages are send in batches of this size
|
||||||
|
batch_send_every_t - If set, messages are send after this timeout
|
||||||
|
"""
|
||||||
|
def __init__(self, client, partitioner=None, async=False,
|
||||||
|
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
||||||
|
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
|
||||||
|
codec=None,
|
||||||
|
batch_send=False,
|
||||||
|
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
||||||
|
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
|
||||||
|
if not partitioner:
|
||||||
|
partitioner = HashedPartitioner
|
||||||
|
self.partitioner_class = partitioner
|
||||||
|
self.partitioners = {}
|
||||||
|
|
||||||
|
super(KeyedProducer, self).__init__(client, async, req_acks,
|
||||||
|
ack_timeout, codec, batch_send,
|
||||||
|
batch_send_every_n,
|
||||||
|
batch_send_every_t)
|
||||||
|
|
||||||
|
def _next_partition(self, topic, key):
|
||||||
|
if topic not in self.partitioners:
|
||||||
|
if not self.client.has_metadata_for_topic(topic):
|
||||||
|
self.client.load_metadata_for_topics(topic)
|
||||||
|
|
||||||
|
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
|
||||||
|
|
||||||
|
partitioner = self.partitioners[topic]
|
||||||
|
return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))
|
||||||
|
|
||||||
|
def send(self, topic, key, msg):
|
||||||
|
partition = self._next_partition(topic, key)
|
||||||
|
return self.send_messages(topic, partition, msg)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return '<KeyedProducer batch=%s>' % self.async
|
||||||
73
kafka/producer/simple.py
Normal file
73
kafka/producer/simple.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
|
||||||
|
from itertools import cycle
|
||||||
|
|
||||||
|
from six.moves import xrange
|
||||||
|
|
||||||
|
from .base import (
|
||||||
|
Producer, BATCH_SEND_DEFAULT_INTERVAL,
|
||||||
|
BATCH_SEND_MSG_COUNT
|
||||||
|
)
|
||||||
|
|
||||||
|
log = logging.getLogger("kafka")
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleProducer(Producer):
|
||||||
|
"""
|
||||||
|
A simple, round-robin producer. Each message goes to exactly one partition
|
||||||
|
|
||||||
|
Params:
|
||||||
|
client - The Kafka client instance to use
|
||||||
|
async - If True, the messages are sent asynchronously via another
|
||||||
|
thread (process). We will not wait for a response to these
|
||||||
|
req_acks - A value indicating the acknowledgements that the server must
|
||||||
|
receive before responding to the request
|
||||||
|
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
|
||||||
|
for an acknowledgement
|
||||||
|
batch_send - If True, messages are send in batches
|
||||||
|
batch_send_every_n - If set, messages are send in batches of this size
|
||||||
|
batch_send_every_t - If set, messages are send after this timeout
|
||||||
|
random_start - If true, randomize the initial partition which the
|
||||||
|
the first message block will be published to, otherwise
|
||||||
|
if false, the first message block will always publish
|
||||||
|
to partition 0 before cycling through each partition
|
||||||
|
"""
|
||||||
|
def __init__(self, client, async=False,
|
||||||
|
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
|
||||||
|
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
|
||||||
|
codec=None,
|
||||||
|
batch_send=False,
|
||||||
|
batch_send_every_n=BATCH_SEND_MSG_COUNT,
|
||||||
|
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
|
||||||
|
random_start=False):
|
||||||
|
self.partition_cycles = {}
|
||||||
|
self.random_start = random_start
|
||||||
|
super(SimpleProducer, self).__init__(client, async, req_acks,
|
||||||
|
ack_timeout, codec, batch_send,
|
||||||
|
batch_send_every_n,
|
||||||
|
batch_send_every_t)
|
||||||
|
|
||||||
|
def _next_partition(self, topic):
|
||||||
|
if topic not in self.partition_cycles:
|
||||||
|
if not self.client.has_metadata_for_topic(topic):
|
||||||
|
self.client.load_metadata_for_topics(topic)
|
||||||
|
|
||||||
|
self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
|
||||||
|
|
||||||
|
# Randomize the initial partition that is returned
|
||||||
|
if self.random_start:
|
||||||
|
num_partitions = len(self.client.get_partition_ids_for_topic(topic))
|
||||||
|
for _ in xrange(random.randint(0, num_partitions-1)):
|
||||||
|
next(self.partition_cycles[topic])
|
||||||
|
|
||||||
|
return next(self.partition_cycles[topic])
|
||||||
|
|
||||||
|
def send_messages(self, topic, *msg):
|
||||||
|
partition = self._next_partition(topic)
|
||||||
|
return super(SimpleProducer, self).send_messages(topic, partition, *msg)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return '<SimpleProducer batch=%s>' % self.async
|
||||||
@@ -4,7 +4,7 @@ from six.moves import xrange
|
|||||||
|
|
||||||
from kafka import SimpleConsumer, MultiProcessConsumer, create_message
|
from kafka import SimpleConsumer, MultiProcessConsumer, create_message
|
||||||
from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
|
from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
|
||||||
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
|
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
|
||||||
|
|
||||||
from test.fixtures import ZookeeperFixture, KafkaFixture
|
from test.fixtures import ZookeeperFixture, KafkaFixture
|
||||||
from test.testutil import (
|
from test.testutil import (
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from . import unittest
|
|||||||
|
|
||||||
from kafka import KafkaClient, SimpleConsumer
|
from kafka import KafkaClient, SimpleConsumer
|
||||||
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
|
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
|
||||||
from kafka.producer import Producer
|
from kafka.producer.base import Producer
|
||||||
|
|
||||||
from test.fixtures import ZookeeperFixture, KafkaFixture
|
from test.fixtures import ZookeeperFixture, KafkaFixture
|
||||||
from test.testutil import (
|
from test.testutil import (
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import logging
|
|||||||
from mock import MagicMock
|
from mock import MagicMock
|
||||||
from . import unittest
|
from . import unittest
|
||||||
|
|
||||||
from kafka.producer import Producer
|
from kafka.producer.base import Producer
|
||||||
|
|
||||||
class TestKafkaProducer(unittest.TestCase):
|
class TestKafkaProducer(unittest.TestCase):
|
||||||
def test_producer_message_types(self):
|
def test_producer_message_types(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user