Do not spin when more processors than partitions
If monasca-notification is configured with more notification processors than there are partitions in the corresponding topic then one or more of the processor instances will not be allocated any partitions. At present this results in the while loop spinning and wasting CPU cycles. Instead, this change will block until the SetPartitioner indicates that the state has changed, at which point the assigned partitions will be re-evaluated. Change-Id: I55e8b1b52b704618cb35593f7bdfc1415d8b584d
This commit is contained in:
parent
ca21c1feed
commit
4b123f4327
@ -18,6 +18,7 @@ import kafka.common
|
||||
import kafka.consumer
|
||||
import logging
|
||||
import monascastatsd
|
||||
import threading
|
||||
|
||||
from kazoo.client import KazooClient
|
||||
from kazoo.recipe.partitioner import SetPartitioner
|
||||
@ -68,10 +69,13 @@ class KafkaConsumer(BaseProcessor):
|
||||
kazoo_client = KazooClient(hosts=self._zookeeper_url)
|
||||
kazoo_client.start()
|
||||
|
||||
state_change_event = threading.Event()
|
||||
|
||||
set_partitioner = (
|
||||
SetPartitioner(kazoo_client,
|
||||
path=self._zookeeper_path,
|
||||
set=self._consumer.fetch_offsets.keys()))
|
||||
set=self._consumer.fetch_offsets.keys(),
|
||||
state_change_event=state_change_event))
|
||||
|
||||
consumed_from_kafka = self._statsd.get_counter(name='consumed_from_kafka')
|
||||
|
||||
@ -94,6 +98,14 @@ class KafkaConsumer(BaseProcessor):
|
||||
if not partitions:
|
||||
partitions = [p for p in set_partitioner]
|
||||
|
||||
if not partitions:
|
||||
log.info("Not assigned any partitions on topic {},"
|
||||
" waiting for a Partitioner state change"
|
||||
.format(self._kafka_topic))
|
||||
state_change_event.wait()
|
||||
state_change_event.clear()
|
||||
continue
|
||||
|
||||
log.info("Acquired locks on partition set {} "
|
||||
"for topic {}".format(partitions, self._kafka_topic))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user