Merge branch 'issue-35'
Conflicts: kafka/__init__.py kafka/consumer.py test/test_integration.py
This commit is contained in:
18
README.md
18
README.md
@@ -89,6 +89,24 @@ producer.send("key2", "this methode")
|
||||
producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
|
||||
```
|
||||
|
||||
## Multiprocess consumer
|
||||
```python
|
||||
from kafka.consume import MultiProcessConsumer
|
||||
|
||||
# This will split the number of partitions among two processes
|
||||
consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2)
|
||||
|
||||
# This will spawn processes such that each handles 2 partitions max
|
||||
consumer = MultiProcessConsumer(kafka, "my-topic", "my-group",
|
||||
partitions_per_proc=2)
|
||||
|
||||
for message in consumer:
|
||||
print(message)
|
||||
|
||||
for message in consumer.get_messages(count=5, block=True, timeout=4):
|
||||
print(message)
|
||||
```
|
||||
|
||||
## Low level
|
||||
|
||||
```python
|
||||
|
||||
@@ -11,10 +11,11 @@ from kafka.protocol import (
|
||||
)
|
||||
from kafka.producer import SimpleProducer, KeyedProducer
|
||||
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
|
||||
from kafka.consumer import SimpleConsumer
|
||||
from kafka.consumer import SimpleConsumer, MultiProcessConsumer
|
||||
|
||||
__all__ = [
|
||||
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
|
||||
'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
|
||||
'create_message', 'create_gzip_message', 'create_snappy_message'
|
||||
'MultiProcessConsumer', 'create_message', 'create_gzip_message',
|
||||
'create_snappy_message'
|
||||
]
|
||||
|
||||
@@ -180,6 +180,10 @@ class KafkaClient(object):
|
||||
for conn in self.conns.values():
|
||||
conn.close()
|
||||
|
||||
def reinit(self):
|
||||
for conn in self.conns.values():
|
||||
conn.reinit()
|
||||
|
||||
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
|
||||
fail_on_error=True, callback=None):
|
||||
"""
|
||||
@@ -230,15 +234,19 @@ class KafkaClient(object):
|
||||
return out
|
||||
|
||||
def send_fetch_request(self, payloads=[], fail_on_error=True,
|
||||
callback=None):
|
||||
callback=None, max_wait_time=100, min_bytes=4096):
|
||||
"""
|
||||
Encode and send a FetchRequest
|
||||
|
||||
Payloads are grouped by topic and partition so they can be pipelined
|
||||
to the same brokers.
|
||||
"""
|
||||
resps = self._send_broker_aware_request(payloads,
|
||||
KafkaProtocol.encode_fetch_request,
|
||||
|
||||
encoder = partial(KafkaProtocol.encode_fetch_request,
|
||||
max_wait_time=max_wait_time,
|
||||
min_bytes=min_bytes)
|
||||
|
||||
resps = self._send_broker_aware_request(payloads, encoder,
|
||||
KafkaProtocol.decode_fetch_response)
|
||||
|
||||
out = []
|
||||
|
||||
@@ -86,3 +86,12 @@ class KafkaConnection(local):
|
||||
def close(self):
|
||||
"Close this connection"
|
||||
self._sock.close()
|
||||
|
||||
def reinit(self):
|
||||
"""
|
||||
Re-initialize the socket connection
|
||||
"""
|
||||
self._sock.close()
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._sock.connect((self.host, self.port))
|
||||
self._sock.settimeout(10)
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
from collections import defaultdict
|
||||
from itertools import izip_longest, repeat
|
||||
import logging
|
||||
import time
|
||||
from threading import Lock
|
||||
from multiprocessing import Process, Queue, Event, Value
|
||||
from Queue import Empty
|
||||
|
||||
from kafka.common import (
|
||||
ErrorMapping, FetchRequest,
|
||||
@@ -16,37 +20,60 @@ log = logging.getLogger("kafka")
|
||||
AUTO_COMMIT_MSG_COUNT = 100
|
||||
AUTO_COMMIT_INTERVAL = 5000
|
||||
|
||||
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
|
||||
FETCH_MAX_WAIT_TIME = 100
|
||||
FETCH_MIN_BYTES = 4096
|
||||
|
||||
class SimpleConsumer(object):
|
||||
|
||||
class FetchContext(object):
|
||||
"""
|
||||
A simple consumer implementation that consumes all partitions for a topic
|
||||
|
||||
client: a connected KafkaClient
|
||||
group: a name for this consumer, used for offset storage and must be unique
|
||||
topic: the topic to consume
|
||||
|
||||
auto_commit: default True. Whether or not to auto commit the offsets
|
||||
auto_commit_every_n: default 100. How many messages to consume
|
||||
before a commit
|
||||
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
||||
wait before commit
|
||||
|
||||
Auto commit details:
|
||||
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
||||
reset one another when one is triggered. These triggers simply call the
|
||||
commit method on this class. A manual call to commit will also reset
|
||||
these triggers
|
||||
|
||||
Class for managing the state of a consumer during fetch
|
||||
"""
|
||||
def __init__(self, client, group, topic, auto_commit=False,
|
||||
def __init__(self, consumer, block, timeout):
|
||||
self.consumer = consumer
|
||||
self.block = block
|
||||
|
||||
if block and not timeout:
|
||||
timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
|
||||
|
||||
self.timeout = timeout * 1000
|
||||
|
||||
def __enter__(self):
|
||||
"""Set fetch values based on blocking status"""
|
||||
if self.block:
|
||||
self.consumer.fetch_max_wait_time = self.timeout
|
||||
self.consumer.fetch_min_bytes = 1
|
||||
else:
|
||||
self.consumer.fetch_min_bytes = 0
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Reset values to default"""
|
||||
self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
|
||||
self.consumer.fetch_min_bytes = FETCH_MIN_BYTES
|
||||
|
||||
|
||||
class Consumer(object):
|
||||
"""
|
||||
Base class to be used by other consumers. Not to be used directly
|
||||
|
||||
This base class provides logic for
|
||||
* initialization and fetching metadata of partitions
|
||||
* Auto-commit logic
|
||||
* APIs for fetching pending message count
|
||||
"""
|
||||
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
|
||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
|
||||
|
||||
self.client = client
|
||||
self.topic = topic
|
||||
self.group = group
|
||||
self.client._load_metadata_for_topics(topic)
|
||||
self.offsets = {}
|
||||
|
||||
if not partitions:
|
||||
partitions = self.client.topic_partitions[topic]
|
||||
|
||||
# Variables for handling offset commits
|
||||
self.commit_lock = Lock()
|
||||
self.commit_timer = None
|
||||
@@ -73,84 +100,17 @@ class SimpleConsumer(object):
|
||||
|
||||
# Uncomment for 0.8.1
|
||||
#
|
||||
#for partition in self.client.topic_partitions[topic]:
|
||||
#for partition in partitions:
|
||||
# req = OffsetFetchRequest(topic, partition)
|
||||
# (offset,) = self.client.send_offset_fetch_request(group, [req],
|
||||
# callback=get_or_init_offset_callback,
|
||||
# fail_on_error=False)
|
||||
# self.offsets[partition] = offset
|
||||
|
||||
for partition in self.client.topic_partitions[topic]:
|
||||
for partition in partitions:
|
||||
self.offsets[partition] = 0
|
||||
|
||||
def stop(self):
|
||||
if self.commit_timer is not None:
|
||||
self.commit_timer.stop()
|
||||
self.commit()
|
||||
|
||||
def seek(self, offset, whence):
|
||||
"""
|
||||
Alter the current offset in the consumer, similar to fseek
|
||||
|
||||
offset: how much to modify the offset
|
||||
whence: where to modify it from
|
||||
0 is relative to the earliest available offset (head)
|
||||
1 is relative to the current offset
|
||||
2 is relative to the latest known offset (tail)
|
||||
"""
|
||||
if whence == 1: # relative to current position
|
||||
for partition, _offset in self.offsets.items():
|
||||
self.offsets[partition] = _offset + offset
|
||||
elif whence in (0, 2): # relative to beginning or end
|
||||
# divide the request offset by number of partitions,
|
||||
# distribute the remained evenly
|
||||
(delta, rem) = divmod(offset, len(self.offsets))
|
||||
deltas = {}
|
||||
for partition, r in izip_longest(self.offsets.keys(),
|
||||
repeat(1, rem), fillvalue=0):
|
||||
deltas[partition] = delta + r
|
||||
|
||||
reqs = []
|
||||
for partition in self.offsets.keys():
|
||||
if whence == 0:
|
||||
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
|
||||
elif whence == 2:
|
||||
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
||||
else:
|
||||
pass
|
||||
|
||||
resps = self.client.send_offset_request(reqs)
|
||||
for resp in resps:
|
||||
self.offsets[resp.partition] = resp.offsets[0] + \
|
||||
deltas[resp.partition]
|
||||
else:
|
||||
raise ValueError("Unexpected value for `whence`, %d" % whence)
|
||||
|
||||
def pending(self, partitions=[]):
|
||||
"""
|
||||
Gets the pending message count
|
||||
|
||||
partitions: list of partitions to check for, default is to check all
|
||||
"""
|
||||
if len(partitions) == 0:
|
||||
partitions = self.offsets.keys()
|
||||
|
||||
total = 0
|
||||
reqs = []
|
||||
|
||||
for partition in partitions:
|
||||
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
||||
|
||||
resps = self.client.send_offset_request(reqs)
|
||||
for resp in resps:
|
||||
partition = resp.partition
|
||||
pending = resp.offsets[0]
|
||||
offset = self.offsets[partition]
|
||||
total += pending - offset - (1 if offset > 0 else 0)
|
||||
|
||||
return total
|
||||
|
||||
def commit(self, partitions=[]):
|
||||
def commit(self, partitions=None):
|
||||
"""
|
||||
Commit offsets for this consumer
|
||||
|
||||
@@ -170,7 +130,7 @@ class SimpleConsumer(object):
|
||||
return
|
||||
|
||||
reqs = []
|
||||
if len(partitions) == 0: # commit all partitions
|
||||
if not partitions: # commit all partitions
|
||||
partitions = self.offsets.keys()
|
||||
|
||||
for partition in partitions:
|
||||
@@ -200,6 +160,148 @@ class SimpleConsumer(object):
|
||||
if self.count_since_commit > self.auto_commit_every_n:
|
||||
self.commit()
|
||||
|
||||
def stop(self):
|
||||
if self.commit_timer is not None:
|
||||
self.commit_timer.stop()
|
||||
self.commit()
|
||||
|
||||
def pending(self, partitions=None):
|
||||
"""
|
||||
Gets the pending message count
|
||||
|
||||
partitions: list of partitions to check for, default is to check all
|
||||
"""
|
||||
if not partitions:
|
||||
partitions = self.offsets.keys()
|
||||
|
||||
total = 0
|
||||
reqs = []
|
||||
|
||||
for partition in partitions:
|
||||
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
||||
|
||||
resps = self.client.send_offset_request(reqs)
|
||||
for resp in resps:
|
||||
partition = resp.partition
|
||||
pending = resp.offsets[0]
|
||||
offset = self.offsets[partition]
|
||||
total += pending - offset - (1 if offset > 0 else 0)
|
||||
|
||||
return total
|
||||
|
||||
|
||||
class SimpleConsumer(Consumer):
|
||||
"""
|
||||
A simple consumer implementation that consumes all/specified partitions
|
||||
for a topic
|
||||
|
||||
client: a connected KafkaClient
|
||||
group: a name for this consumer, used for offset storage and must be unique
|
||||
topic: the topic to consume
|
||||
partitions: An optional list of partitions to consume the data from
|
||||
|
||||
auto_commit: default True. Whether or not to auto commit the offsets
|
||||
auto_commit_every_n: default 100. How many messages to consume
|
||||
before a commit
|
||||
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
||||
wait before commit
|
||||
|
||||
Auto commit details:
|
||||
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
||||
reset one another when one is triggered. These triggers simply call the
|
||||
commit method on this class. A manual call to commit will also reset
|
||||
these triggers
|
||||
"""
|
||||
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
|
||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
|
||||
|
||||
self.partition_info = False # Do not return partition info in msgs
|
||||
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
|
||||
self.fetch_min_bytes = FETCH_MIN_BYTES
|
||||
self.fetch_started = defaultdict(bool) # defaults to false
|
||||
|
||||
super(SimpleConsumer, self).__init__(client, group, topic,
|
||||
partitions=partitions,
|
||||
auto_commit=auto_commit,
|
||||
auto_commit_every_n=auto_commit_every_n,
|
||||
auto_commit_every_t=auto_commit_every_t)
|
||||
|
||||
def provide_partition_info(self):
|
||||
"""
|
||||
Indicates that partition info must be returned by the consumer
|
||||
"""
|
||||
self.partition_info = True
|
||||
|
||||
def seek(self, offset, whence):
|
||||
"""
|
||||
Alter the current offset in the consumer, similar to fseek
|
||||
|
||||
offset: how much to modify the offset
|
||||
whence: where to modify it from
|
||||
0 is relative to the earliest available offset (head)
|
||||
1 is relative to the current offset
|
||||
2 is relative to the latest known offset (tail)
|
||||
"""
|
||||
if whence == 1: # relative to current position
|
||||
for partition, _offset in self.offsets.items():
|
||||
self.offsets[partition] = _offset + offset
|
||||
elif whence in (0, 2): # relative to beginning or end
|
||||
# divide the request offset by number of partitions,
|
||||
# distribute the remained evenly
|
||||
(delta, rem) = divmod(offset, len(self.offsets))
|
||||
deltas = {}
|
||||
for partition, r in izip_longest(self.offsets.keys(),
|
||||
repeat(1, rem), fillvalue=0):
|
||||
deltas[partition] = delta + r
|
||||
|
||||
reqs = []
|
||||
for partition in self.offsets.keys():
|
||||
if whence == 0:
|
||||
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
|
||||
elif whence == 2:
|
||||
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
|
||||
|
||||
# The API returns back the next available offset
|
||||
# For eg: if the current offset is 18, the API will return
|
||||
# back 19. So, if we have to seek 5 points before, we will
|
||||
# end up going back to 14, instead of 13. Adjust this
|
||||
deltas[partition] -= 1
|
||||
else:
|
||||
pass
|
||||
|
||||
resps = self.client.send_offset_request(reqs)
|
||||
for resp in resps:
|
||||
self.offsets[resp.partition] = resp.offsets[0] + \
|
||||
deltas[resp.partition]
|
||||
else:
|
||||
raise ValueError("Unexpected value for `whence`, %d" % whence)
|
||||
|
||||
def get_messages(self, count=1, block=True, timeout=0.1):
|
||||
"""
|
||||
Fetch the specified number of messages
|
||||
|
||||
count: Indicates the maximum number of messages to be fetched
|
||||
block: If True, the API will block till some messages are fetched.
|
||||
timeout: If None, and block=True, the API will block infinitely.
|
||||
If >0, API will block for specified time (in seconds)
|
||||
"""
|
||||
messages = []
|
||||
iterator = self.__iter__()
|
||||
|
||||
# HACK: This splits the timeout between available partitions
|
||||
timeout = timeout * 1.0 / len(self.offsets)
|
||||
|
||||
with FetchContext(self, block, timeout):
|
||||
while count > 0:
|
||||
try:
|
||||
messages.append(next(iterator))
|
||||
except StopIteration as exp:
|
||||
break
|
||||
count -= 1
|
||||
|
||||
return messages
|
||||
|
||||
def __iter__(self):
|
||||
"""
|
||||
Create an iterate per partition. Iterate through them calling next()
|
||||
@@ -218,7 +320,10 @@ class SimpleConsumer(object):
|
||||
|
||||
for partition, it in iters.items():
|
||||
try:
|
||||
yield it.next()
|
||||
if self.partition_info:
|
||||
yield (partition, it.next())
|
||||
else:
|
||||
yield it.next()
|
||||
except StopIteration:
|
||||
log.debug("Done iterating over partition %s" % partition)
|
||||
del iters[partition]
|
||||
@@ -238,10 +343,23 @@ class SimpleConsumer(object):
|
||||
the end of this partition.
|
||||
"""
|
||||
|
||||
# The offset that is stored in the consumer is the offset that
|
||||
# we have consumed. In subsequent iterations, we are supposed to
|
||||
# fetch the next message (that is from the next offset)
|
||||
# However, for the 0th message, the offset should be as-is.
|
||||
# An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
|
||||
# problematic, since 0 is offset of a message which we have not yet
|
||||
# consumed.
|
||||
if self.fetch_started[partition]:
|
||||
offset += 1
|
||||
|
||||
while True:
|
||||
# TODO: configure fetch size
|
||||
req = FetchRequest(self.topic, partition, offset, 1024)
|
||||
(resp,) = self.client.send_fetch_request([req])
|
||||
|
||||
(resp,) = self.client.send_fetch_request([req],
|
||||
max_wait_time=self.fetch_max_wait_time,
|
||||
min_bytes=self.fetch_min_bytes)
|
||||
|
||||
assert resp.topic == self.topic
|
||||
assert resp.partition == partition
|
||||
@@ -249,10 +367,225 @@ class SimpleConsumer(object):
|
||||
next_offset = None
|
||||
for message in resp.messages:
|
||||
next_offset = message.offset
|
||||
yield message
|
||||
# update the internal state _after_ we yield the message
|
||||
|
||||
# update the offset before the message is yielded. This is
|
||||
# so that the consumer state is not lost in certain cases.
|
||||
# For eg: the message is yielded and consumed by the caller,
|
||||
# but the caller does not come back into the generator again.
|
||||
# The message will be consumed but the status will not be
|
||||
# updated in the consumer
|
||||
self.fetch_started[partition] = True
|
||||
self.offsets[partition] = message.offset
|
||||
yield message
|
||||
if next_offset is None:
|
||||
break
|
||||
else:
|
||||
offset = next_offset + 1
|
||||
|
||||
|
||||
class MultiProcessConsumer(Consumer):
|
||||
"""
|
||||
A consumer implementation that consumes partitions for a topic in
|
||||
parallel using multiple processes
|
||||
|
||||
client: a connected KafkaClient
|
||||
group: a name for this consumer, used for offset storage and must be unique
|
||||
topic: the topic to consume
|
||||
|
||||
auto_commit: default True. Whether or not to auto commit the offsets
|
||||
auto_commit_every_n: default 100. How many messages to consume
|
||||
before a commit
|
||||
auto_commit_every_t: default 5000. How much time (in milliseconds) to
|
||||
wait before commit
|
||||
num_procs: Number of processes to start for consuming messages.
|
||||
The available partitions will be divided among these processes
|
||||
partitions_per_proc: Number of partitions to be allocated per process
|
||||
(overrides num_procs)
|
||||
|
||||
Auto commit details:
|
||||
If both auto_commit_every_n and auto_commit_every_t are set, they will
|
||||
reset one another when one is triggered. These triggers simply call the
|
||||
commit method on this class. A manual call to commit will also reset
|
||||
these triggers
|
||||
"""
|
||||
def __init__(self, client, group, topic, auto_commit=True,
|
||||
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
|
||||
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
|
||||
num_procs=1, partitions_per_proc=0):
|
||||
|
||||
# Initiate the base consumer class
|
||||
super(MultiProcessConsumer, self).__init__(client, group, topic,
|
||||
partitions=None,
|
||||
auto_commit=auto_commit,
|
||||
auto_commit_every_n=auto_commit_every_n,
|
||||
auto_commit_every_t=auto_commit_every_t)
|
||||
|
||||
# Variables for managing and controlling the data flow from
|
||||
# consumer child process to master
|
||||
self.queue = Queue(1024) # Child consumers dump messages into this
|
||||
self.start = Event() # Indicates the consumers to start fetch
|
||||
self.exit = Event() # Requests the consumers to shutdown
|
||||
self.pause = Event() # Requests the consumers to pause fetch
|
||||
self.size = Value('i', 0) # Indicator of number of messages to fetch
|
||||
|
||||
partitions = self.offsets.keys()
|
||||
|
||||
# If unspecified, start one consumer per partition
|
||||
# The logic below ensures that
|
||||
# * we do not cross the num_procs limit
|
||||
# * we have an even distribution of partitions among processes
|
||||
if not partitions_per_proc:
|
||||
partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
|
||||
if partitions_per_proc < num_procs * 0.5:
|
||||
partitions_per_proc += 1
|
||||
|
||||
# The final set of chunks
|
||||
chunker = lambda *x: [] + list(x)
|
||||
chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
|
||||
|
||||
self.procs = []
|
||||
for chunk in chunks:
|
||||
chunk = filter(lambda x: x is not None, chunk)
|
||||
proc = Process(target=self._consume, args=(chunk,))
|
||||
proc.daemon = True
|
||||
proc.start()
|
||||
self.procs.append(proc)
|
||||
|
||||
def _consume(self, partitions):
|
||||
"""
|
||||
A child process worker which consumes messages based on the
|
||||
notifications given by the controller process
|
||||
"""
|
||||
|
||||
# Make the child processes open separate socket connections
|
||||
self.client.reinit()
|
||||
|
||||
# We will start consumers without auto-commit. Auto-commit will be
|
||||
# done by the master controller process.
|
||||
consumer = SimpleConsumer(self.client, self.group, self.topic,
|
||||
partitions=partitions,
|
||||
auto_commit=False,
|
||||
auto_commit_every_n=None,
|
||||
auto_commit_every_t=None)
|
||||
|
||||
# Ensure that the consumer provides the partition information
|
||||
consumer.provide_partition_info()
|
||||
|
||||
while True:
|
||||
# Wait till the controller indicates us to start consumption
|
||||
self.start.wait()
|
||||
|
||||
# If we are asked to quit, do so
|
||||
if self.exit.is_set():
|
||||
break
|
||||
|
||||
# Consume messages and add them to the queue. If the controller
|
||||
# indicates a specific number of messages, follow that advice
|
||||
count = 0
|
||||
|
||||
for partition, message in consumer:
|
||||
self.queue.put((partition, message))
|
||||
count += 1
|
||||
|
||||
# We have reached the required size. The controller might have
|
||||
# more than what he needs. Wait for a while.
|
||||
# Without this logic, it is possible that we run into a big
|
||||
# loop consuming all available messages before the controller
|
||||
# can reset the 'start' event
|
||||
if count == self.size.value:
|
||||
self.pause.wait()
|
||||
break
|
||||
|
||||
# In case we did not receive any message, give up the CPU for
|
||||
# a while before we try again
|
||||
if count == 0:
|
||||
time.sleep(0.1)
|
||||
|
||||
consumer.stop()
|
||||
|
||||
def stop(self):
|
||||
# Set exit and start off all waiting consumers
|
||||
self.exit.set()
|
||||
self.pause.set()
|
||||
self.start.set()
|
||||
|
||||
for proc in self.procs:
|
||||
proc.join()
|
||||
proc.terminate()
|
||||
|
||||
super(MultiProcessConsumer, self).stop()
|
||||
|
||||
def __iter__(self):
|
||||
"""
|
||||
Iterator to consume the messages available on this consumer
|
||||
"""
|
||||
# Trigger the consumer procs to start off.
|
||||
# We will iterate till there are no more messages available
|
||||
self.size.value = 0
|
||||
self.pause.set()
|
||||
|
||||
while True:
|
||||
self.start.set()
|
||||
try:
|
||||
# We will block for a small while so that the consumers get
|
||||
# a chance to run and put some messages in the queue
|
||||
# TODO: This is a hack and will make the consumer block for
|
||||
# at least one second. Need to find a better way of doing this
|
||||
partition, message = self.queue.get(block=True, timeout=1)
|
||||
except Empty:
|
||||
break
|
||||
|
||||
# Count, check and commit messages if necessary
|
||||
self.offsets[partition] = message.offset
|
||||
self.start.clear()
|
||||
yield message
|
||||
|
||||
self.count_since_commit += 1
|
||||
self._auto_commit()
|
||||
|
||||
self.start.clear()
|
||||
|
||||
def get_messages(self, count=1, block=True, timeout=10):
|
||||
"""
|
||||
Fetch the specified number of messages
|
||||
|
||||
count: Indicates the maximum number of messages to be fetched
|
||||
block: If True, the API will block till some messages are fetched.
|
||||
timeout: If None, and block=True, the API will block infinitely.
|
||||
If >0, API will block for specified time (in seconds)
|
||||
"""
|
||||
messages = []
|
||||
|
||||
# Give a size hint to the consumers. Each consumer process will fetch
|
||||
# a maximum of "count" messages. This will fetch more messages than
|
||||
# necessary, but these will not be committed to kafka. Also, the extra
|
||||
# messages can be provided in subsequent runs
|
||||
self.size.value = count
|
||||
self.pause.clear()
|
||||
|
||||
while count > 0:
|
||||
# Trigger consumption only if the queue is empty
|
||||
# By doing this, we will ensure that consumers do not
|
||||
# go into overdrive and keep consuming thousands of
|
||||
# messages when the user might need only a few
|
||||
if self.queue.empty():
|
||||
self.start.set()
|
||||
|
||||
try:
|
||||
partition, message = self.queue.get(block, timeout)
|
||||
except Empty:
|
||||
break
|
||||
|
||||
messages.append(message)
|
||||
|
||||
# Count, check and commit messages if necessary
|
||||
self.offsets[partition] = message.offset
|
||||
self.count_since_commit += 1
|
||||
self._auto_commit()
|
||||
count -= 1
|
||||
|
||||
self.size.value = 0
|
||||
self.start.clear()
|
||||
self.pause.set()
|
||||
|
||||
return messages
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import unittest
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from kafka import * # noqa
|
||||
from kafka.common import * # noqa
|
||||
@@ -544,7 +545,7 @@ class TestKafkaClient(unittest.TestCase):
|
||||
producer.stop()
|
||||
|
||||
|
||||
class TestSimpleConsumer(unittest.TestCase):
|
||||
class TestConsumer(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): # noqa
|
||||
cls.zk = ZookeeperFixture.instance()
|
||||
@@ -559,9 +560,9 @@ class TestSimpleConsumer(unittest.TestCase):
|
||||
cls.server2.close()
|
||||
cls.zk.close()
|
||||
|
||||
def test_consumer(self):
|
||||
def test_simple_consumer(self):
|
||||
# Produce 100 messages to partition 0
|
||||
produce1 = ProduceRequest("test_consumer", 0, messages=[
|
||||
produce1 = ProduceRequest("test_simple_consumer", 0, messages=[
|
||||
create_message("Test message 0 %d" % i) for i in range(100)
|
||||
])
|
||||
|
||||
@@ -570,7 +571,7 @@ class TestSimpleConsumer(unittest.TestCase):
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
# Produce 100 messages to partition 1
|
||||
produce2 = ProduceRequest("test_consumer", 1, messages=[
|
||||
produce2 = ProduceRequest("test_simple_consumer", 1, messages=[
|
||||
create_message("Test message 1 %d" % i) for i in range(100)
|
||||
])
|
||||
|
||||
@@ -579,7 +580,7 @@ class TestSimpleConsumer(unittest.TestCase):
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
# Start a consumer
|
||||
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
|
||||
consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer")
|
||||
all_messages = []
|
||||
for message in consumer:
|
||||
all_messages.append(message)
|
||||
@@ -602,31 +603,141 @@ class TestSimpleConsumer(unittest.TestCase):
|
||||
|
||||
self.assertEquals(len(all_messages), 13)
|
||||
|
||||
# Blocking API
|
||||
start = datetime.now()
|
||||
messages = consumer.get_messages(block=True, timeout=5)
|
||||
diff = (datetime.now() - start).total_seconds()
|
||||
self.assertGreaterEqual(diff, 5)
|
||||
self.assertEqual(len(messages), 0)
|
||||
|
||||
# Send 10 messages
|
||||
produce = ProduceRequest("test_simple_consumer", 0, messages=[
|
||||
create_message("Test message 0 %d" % i) for i in range(10)
|
||||
])
|
||||
|
||||
for resp in self.client.send_produce_request([produce]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 100)
|
||||
|
||||
# Fetch 5 messages
|
||||
messages = consumer.get_messages(count=5, block=True, timeout=5)
|
||||
self.assertEqual(len(messages), 5)
|
||||
|
||||
# Fetch 10 messages
|
||||
start = datetime.now()
|
||||
messages = consumer.get_messages(count=10, block=True, timeout=5)
|
||||
self.assertEqual(len(messages), 5)
|
||||
diff = (datetime.now() - start).total_seconds()
|
||||
self.assertGreaterEqual(diff, 5)
|
||||
|
||||
consumer.stop()
|
||||
|
||||
def test_pending(self):
|
||||
def test_simple_consumer_pending(self):
|
||||
# Produce 10 messages to partition 0 and 1
|
||||
|
||||
produce1 = ProduceRequest("test_pending", 0, messages=[
|
||||
produce1 = ProduceRequest("test_simple_pending", 0, messages=[
|
||||
create_message("Test message 0 %d" % i) for i in range(10)
|
||||
])
|
||||
for resp in self.client.send_produce_request([produce1]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
produce2 = ProduceRequest("test_pending", 1, messages=[
|
||||
produce2 = ProduceRequest("test_simple_pending", 1, messages=[
|
||||
create_message("Test message 1 %d" % i) for i in range(10)
|
||||
])
|
||||
for resp in self.client.send_produce_request([produce2]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
consumer = SimpleConsumer(self.client, "group1", "test_pending")
|
||||
consumer = SimpleConsumer(self.client, "group1", "test_simple_pending")
|
||||
self.assertEquals(consumer.pending(), 20)
|
||||
self.assertEquals(consumer.pending(partitions=[0]), 10)
|
||||
self.assertEquals(consumer.pending(partitions=[1]), 10)
|
||||
consumer.stop()
|
||||
|
||||
def test_multi_process_consumer(self):
|
||||
# Produce 100 messages to partition 0
|
||||
produce1 = ProduceRequest("test_mpconsumer", 0, messages=[
|
||||
create_message("Test message 0 %d" % i) for i in range(100)
|
||||
])
|
||||
|
||||
for resp in self.client.send_produce_request([produce1]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
# Produce 100 messages to partition 1
|
||||
produce2 = ProduceRequest("test_mpconsumer", 1, messages=[
|
||||
create_message("Test message 1 %d" % i) for i in range(100)
|
||||
])
|
||||
|
||||
for resp in self.client.send_produce_request([produce2]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
# Start a consumer
|
||||
consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer")
|
||||
all_messages = []
|
||||
for message in consumer:
|
||||
all_messages.append(message)
|
||||
|
||||
self.assertEquals(len(all_messages), 200)
|
||||
# Make sure there are no duplicates
|
||||
self.assertEquals(len(all_messages), len(set(all_messages)))
|
||||
|
||||
# Blocking API
|
||||
start = datetime.now()
|
||||
messages = consumer.get_messages(block=True, timeout=5)
|
||||
diff = (datetime.now() - start).total_seconds()
|
||||
self.assertGreaterEqual(diff, 5)
|
||||
self.assertEqual(len(messages), 0)
|
||||
|
||||
# Send 10 messages
|
||||
produce = ProduceRequest("test_mpconsumer", 0, messages=[
|
||||
create_message("Test message 0 %d" % i) for i in range(10)
|
||||
])
|
||||
|
||||
for resp in self.client.send_produce_request([produce]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 100)
|
||||
|
||||
# Fetch 5 messages
|
||||
messages = consumer.get_messages(count=5, block=True, timeout=5)
|
||||
self.assertEqual(len(messages), 5)
|
||||
|
||||
# Fetch 10 messages
|
||||
start = datetime.now()
|
||||
messages = consumer.get_messages(count=10, block=True, timeout=5)
|
||||
self.assertEqual(len(messages), 5)
|
||||
diff = (datetime.now() - start).total_seconds()
|
||||
self.assertGreaterEqual(diff, 5)
|
||||
|
||||
consumer.stop()
|
||||
|
||||
def test_multi_proc_pending(self):
|
||||
# Produce 10 messages to partition 0 and 1
|
||||
produce1 = ProduceRequest("test_mppending", 0, messages=[
|
||||
create_message("Test message 0 %d" % i) for i in range(10)
|
||||
])
|
||||
|
||||
for resp in self.client.send_produce_request([produce1]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
produce2 = ProduceRequest("test_mppending", 1, messages=[
|
||||
create_message("Test message 1 %d" % i) for i in range(10)
|
||||
])
|
||||
|
||||
for resp in self.client.send_produce_request([produce2]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
consumer = MultiProcessConsumer(self.client, "group1", "test_mppending")
|
||||
self.assertEquals(consumer.pending(), 20)
|
||||
self.assertEquals(consumer.pending(partitions=[0]), 10)
|
||||
self.assertEquals(consumer.pending(partitions=[1]), 10)
|
||||
|
||||
consumer.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
Reference in New Issue
Block a user