Merge "Do not spin when more processors than partitions"
This commit is contained in:
commit
df5c99d3bb
@ -18,6 +18,7 @@ import kafka.common
|
||||
import kafka.consumer
|
||||
import logging
|
||||
import monascastatsd
|
||||
import threading
|
||||
|
||||
from kazoo.client import KazooClient
|
||||
from kazoo.recipe.partitioner import SetPartitioner
|
||||
@ -69,10 +70,13 @@ class KafkaConsumer(BaseProcessor):
|
||||
kazoo_client = KazooClient(hosts=self._zookeeper_url)
|
||||
kazoo_client.start()
|
||||
|
||||
state_change_event = threading.Event()
|
||||
|
||||
set_partitioner = (
|
||||
SetPartitioner(kazoo_client,
|
||||
path=self._zookeeper_path,
|
||||
set=self._consumer.fetch_offsets.keys()))
|
||||
set=self._consumer.fetch_offsets.keys(),
|
||||
state_change_event=state_change_event))
|
||||
|
||||
consumed_from_kafka = self._statsd.get_counter(name='consumed_from_kafka')
|
||||
|
||||
@ -95,6 +99,14 @@ class KafkaConsumer(BaseProcessor):
|
||||
if not partitions:
|
||||
partitions = [p for p in set_partitioner]
|
||||
|
||||
if not partitions:
|
||||
log.info("Not assigned any partitions on topic {},"
|
||||
" waiting for a Partitioner state change"
|
||||
.format(self._kafka_topic))
|
||||
state_change_event.wait()
|
||||
state_change_event.clear()
|
||||
continue
|
||||
|
||||
log.info("Acquired locks on partition set {} "
|
||||
"for topic {}".format(partitions, self._kafka_topic))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user