@@ -9,24 +9,27 @@ from .client import KafkaClient, FetchRequest, ProduceRequest
|
||||
log = logging.getLogger("kafka")
|
||||
|
||||
class KafkaConsumerProcess(Process):
|
||||
def __init__(self, client, topic, partition, out_queue, barrier):
|
||||
def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200):
|
||||
self.client = copy(client)
|
||||
self.topic = topic
|
||||
self.partition = partition
|
||||
self.out_queue = out_queue
|
||||
self.barrier = barrier
|
||||
self.consumer_sleep = 0.2
|
||||
self.consumer_sleep = consumer_sleep / 1000.
|
||||
log.info("Initializing %s" % self)
|
||||
Process.__init__(self)
|
||||
|
||||
def config(self, consumer_sleep):
|
||||
self.consumer_sleep = consumer_sleep / 1000.
|
||||
def __str__(self):
|
||||
return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % (
|
||||
self.topic, self.partition, self.consumer_sleep)
|
||||
|
||||
def run(self):
|
||||
self.barrier.wait()
|
||||
log.info("Starting Consumer")
|
||||
log.info("Starting %s" % self)
|
||||
fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
|
||||
while True:
|
||||
if self.barrier.is_set() == False:
|
||||
log.info("Shutdown %s" % self)
|
||||
self.client.close()
|
||||
break
|
||||
lastOffset = fetchRequest.offset
|
||||
@@ -39,24 +42,24 @@ class KafkaConsumerProcess(Process):
|
||||
self.out_queue.put(message)
|
||||
|
||||
class KafkaProducerProcess(Process):
|
||||
def __init__(self, client, topic, in_queue, barrier):
|
||||
def __init__(self, client, topic, in_queue, barrier, producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100):
|
||||
self.client = copy(client)
|
||||
self.topic = topic
|
||||
self.in_queue = in_queue
|
||||
self.barrier = barrier
|
||||
self.producer_flush_buffer = 100
|
||||
self.producer_flush_timeout = 2.0
|
||||
self.producer_timeout = 0.1
|
||||
Process.__init__(self)
|
||||
|
||||
def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout):
|
||||
self.producer_flush_buffer = producer_flush_buffer
|
||||
self.producer_flush_timeout = producer_flush_timeout / 1000.
|
||||
self.producer_timeout = producer_timeout / 1000.
|
||||
log.info("Initializing %s" % self)
|
||||
Process.__init__(self)
|
||||
|
||||
def __str__(self):
|
||||
return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, flush_timeout=%s, timeout=%s]" % (
|
||||
self.topic, self.producer_flush_buffer, self.producer_flush_timeout, self.producer_timeout)
|
||||
|
||||
def run(self):
|
||||
self.barrier.wait()
|
||||
log.info("Starting Producer")
|
||||
log.info("Starting %s" % self)
|
||||
messages = []
|
||||
last_produce = time.time()
|
||||
|
||||
@@ -66,13 +69,14 @@ class KafkaProducerProcess(Process):
|
||||
|
||||
while True:
|
||||
if self.barrier.is_set() == False:
|
||||
log.info("Producer shut down. Flushing messages")
|
||||
log.info("Shutdown %s, flushing messages" % self)
|
||||
flush(messages)
|
||||
self.client.close()
|
||||
break
|
||||
if len(messages) > self.producer_flush_buffer:
|
||||
log.debug("Message count threashold reached. Flushing messages")
|
||||
flush(messages)
|
||||
last_produce = time.time()
|
||||
elif (time.time() - last_produce) > self.producer_flush_timeout:
|
||||
log.debug("Producer timeout reached. Flushing messages")
|
||||
flush(messages)
|
||||
@@ -83,7 +87,33 @@ class KafkaProducerProcess(Process):
|
||||
continue
|
||||
|
||||
class KafkaQueue(object):
|
||||
def __init__(self, client, topic, partitions):
|
||||
def __init__(self, client, topic, partitions, producer_config={}, consumer_config={}):
|
||||
"""
|
||||
KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers
|
||||
|
||||
Params
|
||||
======
|
||||
client: KafkaClient object
|
||||
topic: str, the topic name
|
||||
partitions: list of ints, the partions to consume from
|
||||
producer_config: dict, see below
|
||||
consumer_config: dict, see below
|
||||
|
||||
Consumer Config
|
||||
===============
|
||||
consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches
|
||||
the end of a partition. Default is 200
|
||||
|
||||
Producer Config
|
||||
===============
|
||||
producer_timeout: int, time in milliseconds a producer should wait for messages to
|
||||
enqueue for producing. Default is 100
|
||||
producer_flush_timeout: int, time in milliseconds a producer should allow messages
|
||||
to accumulate before sending to Kafka. Default is 2000
|
||||
producer_flush_buffer: int, number of messages a producer should allow to accumulate.
|
||||
Default is 500
|
||||
|
||||
"""
|
||||
self.in_queue = Queue()
|
||||
self.out_queue = Queue()
|
||||
self.consumers = []
|
||||
@@ -91,14 +121,12 @@ class KafkaQueue(object):
|
||||
|
||||
# Initialize and start consumer threads
|
||||
for partition in partitions:
|
||||
consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier)
|
||||
consumer.config(consumer_sleep=200)
|
||||
consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier, **consumer_config)
|
||||
consumer.start()
|
||||
self.consumers.append(consumer)
|
||||
|
||||
# Initialize and start producer thread
|
||||
self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier)
|
||||
self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100)
|
||||
self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier, **producer_config)
|
||||
self.producer.start()
|
||||
|
||||
# Trigger everything to start
|
||||
|
||||
@@ -258,4 +258,5 @@ class IntegrationTest(unittest.TestCase):
|
||||
q.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user