 3bfe593e2f
			
		
	
	3bfe593e2f
	
	
	
		
			
			- use helper methods not direct access - add get_partition_ids_for_topic - check for topic and partition errors during load_metadata_for_topics - raise LeaderNotAvailableError when topic is being auto-created or UnknownTopicOrPartitionError if auto-creation off
		
			
				
	
	
		
			691 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			691 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import absolute_import
 | |
| 
 | |
| from itertools import izip_longest, repeat
 | |
| import logging
 | |
| import time
 | |
| import numbers
 | |
| from threading import Lock
 | |
| from multiprocessing import Process, Queue as MPQueue, Event, Value
 | |
| from Queue import Empty, Queue
 | |
| 
 | |
| import kafka
 | |
| from kafka.common import (
 | |
|     FetchRequest,
 | |
|     OffsetRequest, OffsetCommitRequest,
 | |
|     OffsetFetchRequest,
 | |
|     ConsumerFetchSizeTooSmall, ConsumerNoMoreData
 | |
| )
 | |
| 
 | |
| from kafka.util import ReentrantTimer
 | |
| 
 | |
| log = logging.getLogger("kafka")
 | |
| 
 | |
| AUTO_COMMIT_MSG_COUNT = 100
 | |
| AUTO_COMMIT_INTERVAL = 5000
 | |
| 
 | |
| FETCH_DEFAULT_BLOCK_TIMEOUT = 1
 | |
| FETCH_MAX_WAIT_TIME = 100
 | |
| FETCH_MIN_BYTES = 4096
 | |
| FETCH_BUFFER_SIZE_BYTES = 4096
 | |
| MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
 | |
| 
 | |
| ITER_TIMEOUT_SECONDS = 60
 | |
| NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
 | |
| 
 | |
| 
 | |
| class FetchContext(object):
 | |
|     """
 | |
|     Class for managing the state of a consumer during fetch
 | |
|     """
 | |
|     def __init__(self, consumer, block, timeout):
 | |
|         self.consumer = consumer
 | |
|         self.block = block
 | |
| 
 | |
|         if block:
 | |
|             if not timeout:
 | |
|                 timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
 | |
|             self.timeout = timeout * 1000
 | |
| 
 | |
|     def __enter__(self):
 | |
|         """Set fetch values based on blocking status"""
 | |
|         self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
 | |
|         self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
 | |
|         if self.block:
 | |
|             self.consumer.fetch_max_wait_time = self.timeout
 | |
|             self.consumer.fetch_min_bytes = 1
 | |
|         else:
 | |
|             self.consumer.fetch_min_bytes = 0
 | |
| 
 | |
|     def __exit__(self, type, value, traceback):
 | |
|         """Reset values"""
 | |
|         self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
 | |
|         self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
 | |
| 
 | |
| 
 | |
| class Consumer(object):
 | |
|     """
 | |
|     Base class to be used by other consumers. Not to be used directly
 | |
| 
 | |
|     This base class provides logic for
 | |
|     * initialization and fetching metadata of partitions
 | |
|     * Auto-commit logic
 | |
|     * APIs for fetching pending message count
 | |
|     """
 | |
|     def __init__(self, client, group, topic, partitions=None, auto_commit=True,
 | |
|                  auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
 | |
|                  auto_commit_every_t=AUTO_COMMIT_INTERVAL):
 | |
| 
 | |
|         self.client = client
 | |
|         self.topic = topic
 | |
|         self.group = group
 | |
|         self.client.load_metadata_for_topics(topic)
 | |
|         self.offsets = {}
 | |
| 
 | |
|         if not partitions:
 | |
|             partitions = self.client.get_partition_ids_for_topic(topic)
 | |
|         else:
 | |
|             assert all(isinstance(x, numbers.Integral) for x in partitions)
 | |
| 
 | |
|         # Variables for handling offset commits
 | |
|         self.commit_lock = Lock()
 | |
|         self.commit_timer = None
 | |
|         self.count_since_commit = 0
 | |
|         self.auto_commit = auto_commit
 | |
|         self.auto_commit_every_n = auto_commit_every_n
 | |
|         self.auto_commit_every_t = auto_commit_every_t
 | |
| 
 | |
|         # Set up the auto-commit timer
 | |
|         if auto_commit is True and auto_commit_every_t is not None:
 | |
|             self.commit_timer = ReentrantTimer(auto_commit_every_t,
 | |
|                                                self.commit)
 | |
|             self.commit_timer.start()
 | |
| 
 | |
|         if auto_commit:
 | |
|             self.fetch_last_known_offsets(partitions)
 | |
|         else:
 | |
|             for partition in partitions:
 | |
|                 self.offsets[partition] = 0
 | |
| 
 | |
|     def fetch_last_known_offsets(self, partitions=None):
 | |
|         if not partitions:
 | |
|             partitions = self.client.get_partition_ids_for_topic(self.topic)
 | |
| 
 | |
|         def get_or_init_offset_callback(resp):
 | |
|             try:
 | |
|                 kafka.common.check_error(resp)
 | |
|                 return resp.offset
 | |
|             except kafka.common.UnknownTopicOrPartitionError:
 | |
|                 return 0
 | |
| 
 | |
|         for partition in partitions:
 | |
|             req = OffsetFetchRequest(self.topic, partition)
 | |
|             (offset,) = self.client.send_offset_fetch_request(self.group, [req],
 | |
|                           callback=get_or_init_offset_callback,
 | |
|                           fail_on_error=False)
 | |
|             self.offsets[partition] = offset
 | |
|         self.fetch_offsets = self.offsets.copy()
 | |
| 
 | |
|     def commit(self, partitions=None):
 | |
|         """
 | |
|         Commit offsets for this consumer
 | |
| 
 | |
|         partitions: list of partitions to commit, default is to commit
 | |
|                     all of them
 | |
|         """
 | |
| 
 | |
|         # short circuit if nothing happened. This check is kept outside
 | |
|         # to prevent un-necessarily acquiring a lock for checking the state
 | |
|         if self.count_since_commit == 0:
 | |
|             return
 | |
| 
 | |
|         with self.commit_lock:
 | |
|             # Do this check again, just in case the state has changed
 | |
|             # during the lock acquiring timeout
 | |
|             if self.count_since_commit == 0:
 | |
|                 return
 | |
| 
 | |
|             reqs = []
 | |
|             if not partitions:  # commit all partitions
 | |
|                 partitions = self.offsets.keys()
 | |
| 
 | |
|             for partition in partitions:
 | |
|                 offset = self.offsets[partition]
 | |
|                 log.debug("Commit offset %d in SimpleConsumer: "
 | |
|                           "group=%s, topic=%s, partition=%s" %
 | |
|                           (offset, self.group, self.topic, partition))
 | |
| 
 | |
|                 reqs.append(OffsetCommitRequest(self.topic, partition,
 | |
|                                                 offset, None))
 | |
| 
 | |
|             resps = self.client.send_offset_commit_request(self.group, reqs)
 | |
|             for resp in resps:
 | |
|                 kafka.common.check_error(resp)
 | |
| 
 | |
|             self.count_since_commit = 0
 | |
| 
 | |
|     def _auto_commit(self):
 | |
|         """
 | |
|         Check if we have to commit based on number of messages and commit
 | |
|         """
 | |
| 
 | |
|         # Check if we are supposed to do an auto-commit
 | |
|         if not self.auto_commit or self.auto_commit_every_n is None:
 | |
|             return
 | |
| 
 | |
|         if self.count_since_commit >= self.auto_commit_every_n:
 | |
|             self.commit()
 | |
| 
 | |
|     def stop(self):
 | |
|         if self.commit_timer is not None:
 | |
|             self.commit_timer.stop()
 | |
|             self.commit()
 | |
| 
 | |
|     def pending(self, partitions=None):
 | |
|         """
 | |
|         Gets the pending message count
 | |
| 
 | |
|         partitions: list of partitions to check for, default is to check all
 | |
|         """
 | |
|         if not partitions:
 | |
|             partitions = self.offsets.keys()
 | |
| 
 | |
|         total = 0
 | |
|         reqs = []
 | |
| 
 | |
|         for partition in partitions:
 | |
|             reqs.append(OffsetRequest(self.topic, partition, -1, 1))
 | |
| 
 | |
|         resps = self.client.send_offset_request(reqs)
 | |
|         for resp in resps:
 | |
|             partition = resp.partition
 | |
|             pending = resp.offsets[0]
 | |
|             offset = self.offsets[partition]
 | |
|             total += pending - offset - (1 if offset > 0 else 0)
 | |
| 
 | |
|         return total
 | |
| 
 | |
| 
 | |
| class SimpleConsumer(Consumer):
 | |
|     """
 | |
|     A simple consumer implementation that consumes all/specified partitions
 | |
|     for a topic
 | |
| 
 | |
|     client: a connected KafkaClient
 | |
|     group: a name for this consumer, used for offset storage and must be unique
 | |
|     topic: the topic to consume
 | |
|     partitions: An optional list of partitions to consume the data from
 | |
| 
 | |
|     auto_commit: default True. Whether or not to auto commit the offsets
 | |
|     auto_commit_every_n: default 100. How many messages to consume
 | |
|                          before a commit
 | |
|     auto_commit_every_t: default 5000. How much time (in milliseconds) to
 | |
|                          wait before commit
 | |
|     fetch_size_bytes:    number of bytes to request in a FetchRequest
 | |
|     buffer_size:         default 4K. Initial number of bytes to tell kafka we
 | |
|                          have available. This will double as needed.
 | |
|     max_buffer_size:     default 16K. Max number of bytes to tell kafka we have
 | |
|                          available. None means no limit.
 | |
|     iter_timeout:        default None. How much time (in seconds) to wait for a
 | |
|                          message in the iterator before exiting. None means no
 | |
|                          timeout, so it will wait forever.
 | |
| 
 | |
|     Auto commit details:
 | |
|     If both auto_commit_every_n and auto_commit_every_t are set, they will
 | |
|     reset one another when one is triggered. These triggers simply call the
 | |
|     commit method on this class. A manual call to commit will also reset
 | |
|     these triggers
 | |
|     """
 | |
|     def __init__(self, client, group, topic, auto_commit=True, partitions=None,
 | |
|                  auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
 | |
|                  auto_commit_every_t=AUTO_COMMIT_INTERVAL,
 | |
|                  fetch_size_bytes=FETCH_MIN_BYTES,
 | |
|                  buffer_size=FETCH_BUFFER_SIZE_BYTES,
 | |
|                  max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
 | |
|                  iter_timeout=None):
 | |
|         super(SimpleConsumer, self).__init__(
 | |
|             client, group, topic,
 | |
|             partitions=partitions,
 | |
|             auto_commit=auto_commit,
 | |
|             auto_commit_every_n=auto_commit_every_n,
 | |
|             auto_commit_every_t=auto_commit_every_t)
 | |
| 
 | |
|         if max_buffer_size is not None and buffer_size > max_buffer_size:
 | |
|             raise ValueError("buffer_size (%d) is greater than "
 | |
|                              "max_buffer_size (%d)" %
 | |
|                              (buffer_size, max_buffer_size))
 | |
|         self.buffer_size = buffer_size
 | |
|         self.max_buffer_size = max_buffer_size
 | |
|         self.partition_info = False     # Do not return partition info in msgs
 | |
|         self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
 | |
|         self.fetch_min_bytes = fetch_size_bytes
 | |
|         self.fetch_offsets = self.offsets.copy()
 | |
|         self.iter_timeout = iter_timeout
 | |
|         self.queue = Queue()
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
 | |
|             (self.group, self.topic, str(self.offsets.keys()))
 | |
| 
 | |
|     def provide_partition_info(self):
 | |
|         """
 | |
|         Indicates that partition info must be returned by the consumer
 | |
|         """
 | |
|         self.partition_info = True
 | |
| 
 | |
|     def seek(self, offset, whence):
 | |
|         """
 | |
|         Alter the current offset in the consumer, similar to fseek
 | |
| 
 | |
|         offset: how much to modify the offset
 | |
|         whence: where to modify it from
 | |
|                 0 is relative to the earliest available offset (head)
 | |
|                 1 is relative to the current offset
 | |
|                 2 is relative to the latest known offset (tail)
 | |
|         """
 | |
| 
 | |
|         if whence == 1:  # relative to current position
 | |
|             for partition, _offset in self.offsets.items():
 | |
|                 self.offsets[partition] = _offset + offset
 | |
|         elif whence in (0, 2):  # relative to beginning or end
 | |
|             # divide the request offset by number of partitions,
 | |
|             # distribute the remained evenly
 | |
|             (delta, rem) = divmod(offset, len(self.offsets))
 | |
|             deltas = {}
 | |
|             for partition, r in izip_longest(self.offsets.keys(),
 | |
|                                              repeat(1, rem), fillvalue=0):
 | |
|                 deltas[partition] = delta + r
 | |
| 
 | |
|             reqs = []
 | |
|             for partition in self.offsets.keys():
 | |
|                 if whence == 0:
 | |
|                     reqs.append(OffsetRequest(self.topic, partition, -2, 1))
 | |
|                 elif whence == 2:
 | |
|                     reqs.append(OffsetRequest(self.topic, partition, -1, 1))
 | |
|                 else:
 | |
|                     pass
 | |
| 
 | |
|             resps = self.client.send_offset_request(reqs)
 | |
|             for resp in resps:
 | |
|                 self.offsets[resp.partition] = \
 | |
|                     resp.offsets[0] + deltas[resp.partition]
 | |
|         else:
 | |
|             raise ValueError("Unexpected value for `whence`, %d" % whence)
 | |
| 
 | |
|         # Reset queue and fetch offsets since they are invalid
 | |
|         self.fetch_offsets = self.offsets.copy()
 | |
|         if self.auto_commit:
 | |
|             self.count_since_commit += 1
 | |
|             self.commit()
 | |
| 
 | |
|         self.queue = Queue()
 | |
| 
 | |
|     def get_messages(self, count=1, block=True, timeout=0.1):
 | |
|         """
 | |
|         Fetch the specified number of messages
 | |
| 
 | |
|         count: Indicates the maximum number of messages to be fetched
 | |
|         block: If True, the API will block till some messages are fetched.
 | |
|         timeout: If block is True, the function will block for the specified
 | |
|                  time (in seconds) until count messages is fetched. If None,
 | |
|                  it will block forever.
 | |
|         """
 | |
|         messages = []
 | |
|         if timeout is not None:
 | |
|             max_time = time.time() + timeout
 | |
| 
 | |
|         new_offsets = {}
 | |
|         while count > 0 and (timeout is None or timeout > 0):
 | |
|             result = self._get_message(block, timeout, get_partition_info=True,
 | |
|                                        update_offset=False)
 | |
|             if result:
 | |
|                 partition, message = result
 | |
|                 if self.partition_info:
 | |
|                     messages.append(result)
 | |
|                 else:
 | |
|                     messages.append(message)
 | |
|                 new_offsets[partition] = message.offset + 1
 | |
|                 count -= 1
 | |
|             else:
 | |
|                 # Ran out of messages for the last request.
 | |
|                 if not block:
 | |
|                     # If we're not blocking, break.
 | |
|                     break
 | |
|                 if timeout is not None:
 | |
|                     # If we're blocking and have a timeout, reduce it to the
 | |
|                     # appropriate value
 | |
|                     timeout = max_time - time.time()
 | |
| 
 | |
|         # Update and commit offsets if necessary
 | |
|         self.offsets.update(new_offsets)
 | |
|         self.count_since_commit += len(messages)
 | |
|         self._auto_commit()
 | |
|         return messages
 | |
| 
 | |
|     def get_message(self, block=True, timeout=0.1, get_partition_info=None):
 | |
|         return self._get_message(block, timeout, get_partition_info)
 | |
| 
 | |
|     def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
 | |
|                      update_offset=True):
 | |
|         """
 | |
|         If no messages can be fetched, returns None.
 | |
|         If get_partition_info is None, it defaults to self.partition_info
 | |
|         If get_partition_info is True, returns (partition, message)
 | |
|         If get_partition_info is False, returns message
 | |
|         """
 | |
|         if self.queue.empty():
 | |
|             # We're out of messages, go grab some more.
 | |
|             with FetchContext(self, block, timeout):
 | |
|                 self._fetch()
 | |
|         try:
 | |
|             partition, message = self.queue.get_nowait()
 | |
| 
 | |
|             if update_offset:
 | |
|                 # Update partition offset
 | |
|                 self.offsets[partition] = message.offset + 1
 | |
| 
 | |
|                 # Count, check and commit messages if necessary
 | |
|                 self.count_since_commit += 1
 | |
|                 self._auto_commit()
 | |
| 
 | |
|             if get_partition_info is None:
 | |
|                 get_partition_info = self.partition_info
 | |
|             if get_partition_info:
 | |
|                 return partition, message
 | |
|             else:
 | |
|                 return message
 | |
|         except Empty:
 | |
|             return None
 | |
| 
 | |
|     def __iter__(self):
 | |
|         if self.iter_timeout is None:
 | |
|             timeout = ITER_TIMEOUT_SECONDS
 | |
|         else:
 | |
|             timeout = self.iter_timeout
 | |
| 
 | |
|         while True:
 | |
|             message = self.get_message(True, timeout)
 | |
|             if message:
 | |
|                 yield message
 | |
|             elif self.iter_timeout is None:
 | |
|                 # We did not receive any message yet but we don't have a
 | |
|                 # timeout, so give up the CPU for a while before trying again
 | |
|                 time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
 | |
|             else:
 | |
|                 # Timed out waiting for a message
 | |
|                 break
 | |
| 
 | |
|     def _fetch(self):
 | |
|         # Create fetch request payloads for all the partitions
 | |
|         partitions = dict((p, self.buffer_size)
 | |
|                       for p in self.fetch_offsets.keys())
 | |
|         while partitions:
 | |
|             requests = []
 | |
|             for partition, buffer_size in partitions.iteritems():
 | |
|                 requests.append(FetchRequest(self.topic, partition,
 | |
|                                              self.fetch_offsets[partition],
 | |
|                                              buffer_size))
 | |
|             # Send request
 | |
|             responses = self.client.send_fetch_request(
 | |
|                 requests,
 | |
|                 max_wait_time=int(self.fetch_max_wait_time),
 | |
|                 min_bytes=self.fetch_min_bytes)
 | |
| 
 | |
|             retry_partitions = {}
 | |
|             for resp in responses:
 | |
|                 partition = resp.partition
 | |
|                 buffer_size = partitions[partition]
 | |
|                 try:
 | |
|                     for message in resp.messages:
 | |
|                         # Put the message in our queue
 | |
|                         self.queue.put((partition, message))
 | |
|                         self.fetch_offsets[partition] = message.offset + 1
 | |
|                 except ConsumerFetchSizeTooSmall:
 | |
|                     if (self.max_buffer_size is not None and
 | |
|                             buffer_size == self.max_buffer_size):
 | |
|                         log.error("Max fetch size %d too small",
 | |
|                                   self.max_buffer_size)
 | |
|                         raise
 | |
|                     if self.max_buffer_size is None:
 | |
|                         buffer_size *= 2
 | |
|                     else:
 | |
|                         buffer_size = max(buffer_size * 2,
 | |
|                                           self.max_buffer_size)
 | |
|                     log.warn("Fetch size too small, increase to %d (2x) "
 | |
|                              "and retry", buffer_size)
 | |
|                     retry_partitions[partition] = buffer_size
 | |
|                 except ConsumerNoMoreData as e:
 | |
|                     log.debug("Iteration was ended by %r", e)
 | |
|                 except StopIteration:
 | |
|                     # Stop iterating through this partition
 | |
|                     log.debug("Done iterating over partition %s" % partition)
 | |
|             partitions = retry_partitions
 | |
| 
 | |
| def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
 | |
|     """
 | |
|     A child process worker which consumes messages based on the
 | |
|     notifications given by the controller process
 | |
| 
 | |
|     NOTE: Ideally, this should have been a method inside the Consumer
 | |
|     class. However, multiprocessing module has issues in windows. The
 | |
|     functionality breaks unless this function is kept outside of a class
 | |
|     """
 | |
| 
 | |
|     # Make the child processes open separate socket connections
 | |
|     client.reinit()
 | |
| 
 | |
|     # We will start consumers without auto-commit. Auto-commit will be
 | |
|     # done by the master controller process.
 | |
|     consumer = SimpleConsumer(client, group, topic,
 | |
|                               partitions=chunk,
 | |
|                               auto_commit=False,
 | |
|                               auto_commit_every_n=None,
 | |
|                               auto_commit_every_t=None)
 | |
| 
 | |
|     # Ensure that the consumer provides the partition information
 | |
|     consumer.provide_partition_info()
 | |
| 
 | |
|     while True:
 | |
|         # Wait till the controller indicates us to start consumption
 | |
|         start.wait()
 | |
| 
 | |
|         # If we are asked to quit, do so
 | |
|         if exit.is_set():
 | |
|             break
 | |
| 
 | |
|         # Consume messages and add them to the queue. If the controller
 | |
|         # indicates a specific number of messages, follow that advice
 | |
|         count = 0
 | |
| 
 | |
|         message = consumer.get_message()
 | |
|         if message:
 | |
|             queue.put(message)
 | |
|             count += 1
 | |
| 
 | |
|             # We have reached the required size. The controller might have
 | |
|             # more than what he needs. Wait for a while.
 | |
|             # Without this logic, it is possible that we run into a big
 | |
|             # loop consuming all available messages before the controller
 | |
|             # can reset the 'start' event
 | |
|             if count == size.value:
 | |
|                 pause.wait()
 | |
| 
 | |
|         else:
 | |
|             # In case we did not receive any message, give up the CPU for
 | |
|             # a while before we try again
 | |
|             time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
 | |
| 
 | |
|     consumer.stop()
 | |
| 
 | |
| 
 | |
| class MultiProcessConsumer(Consumer):
 | |
|     """
 | |
|     A consumer implementation that consumes partitions for a topic in
 | |
|     parallel using multiple processes
 | |
| 
 | |
|     client: a connected KafkaClient
 | |
|     group: a name for this consumer, used for offset storage and must be unique
 | |
|     topic: the topic to consume
 | |
| 
 | |
|     auto_commit: default True. Whether or not to auto commit the offsets
 | |
|     auto_commit_every_n: default 100. How many messages to consume
 | |
|                          before a commit
 | |
|     auto_commit_every_t: default 5000. How much time (in milliseconds) to
 | |
|                          wait before commit
 | |
|     num_procs: Number of processes to start for consuming messages.
 | |
|                The available partitions will be divided among these processes
 | |
|     partitions_per_proc: Number of partitions to be allocated per process
 | |
|                (overrides num_procs)
 | |
| 
 | |
|     Auto commit details:
 | |
|     If both auto_commit_every_n and auto_commit_every_t are set, they will
 | |
|     reset one another when one is triggered. These triggers simply call the
 | |
|     commit method on this class. A manual call to commit will also reset
 | |
|     these triggers
 | |
|     """
 | |
|     def __init__(self, client, group, topic, auto_commit=True,
 | |
|                  auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
 | |
|                  auto_commit_every_t=AUTO_COMMIT_INTERVAL,
 | |
|                  num_procs=1, partitions_per_proc=0):
 | |
| 
 | |
|         # Initiate the base consumer class
 | |
|         super(MultiProcessConsumer, self).__init__(
 | |
|             client, group, topic,
 | |
|             partitions=None,
 | |
|             auto_commit=auto_commit,
 | |
|             auto_commit_every_n=auto_commit_every_n,
 | |
|             auto_commit_every_t=auto_commit_every_t)
 | |
| 
 | |
|         # Variables for managing and controlling the data flow from
 | |
|         # consumer child process to master
 | |
|         self.queue = MPQueue(1024)  # Child consumers dump messages into this
 | |
|         self.start = Event()        # Indicates the consumers to start fetch
 | |
|         self.exit = Event()         # Requests the consumers to shutdown
 | |
|         self.pause = Event()        # Requests the consumers to pause fetch
 | |
|         self.size = Value('i', 0)   # Indicator of number of messages to fetch
 | |
| 
 | |
|         partitions = self.offsets.keys()
 | |
| 
 | |
|         # If unspecified, start one consumer per partition
 | |
|         # The logic below ensures that
 | |
|         # * we do not cross the num_procs limit
 | |
|         # * we have an even distribution of partitions among processes
 | |
|         if not partitions_per_proc:
 | |
|             partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
 | |
|             if partitions_per_proc < num_procs * 0.5:
 | |
|                 partitions_per_proc += 1
 | |
| 
 | |
|         # The final set of chunks
 | |
|         chunker = lambda *x: [] + list(x)
 | |
|         chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
 | |
| 
 | |
|         self.procs = []
 | |
|         for chunk in chunks:
 | |
|             chunk = filter(lambda x: x is not None, chunk)
 | |
|             args = (client.copy(),
 | |
|                     group, topic, chunk,
 | |
|                     self.queue, self.start, self.exit,
 | |
|                     self.pause, self.size)
 | |
| 
 | |
|             proc = Process(target=_mp_consume, args=args)
 | |
|             proc.daemon = True
 | |
|             proc.start()
 | |
|             self.procs.append(proc)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
 | |
|             (self.group, self.topic, len(self.procs))
 | |
| 
 | |
|     def stop(self):
 | |
|         # Set exit and start off all waiting consumers
 | |
|         self.exit.set()
 | |
|         self.pause.set()
 | |
|         self.start.set()
 | |
| 
 | |
|         for proc in self.procs:
 | |
|             proc.join()
 | |
|             proc.terminate()
 | |
| 
 | |
|         super(MultiProcessConsumer, self).stop()
 | |
| 
 | |
|     def __iter__(self):
 | |
|         """
 | |
|         Iterator to consume the messages available on this consumer
 | |
|         """
 | |
|         # Trigger the consumer procs to start off.
 | |
|         # We will iterate till there are no more messages available
 | |
|         self.size.value = 0
 | |
|         self.pause.set()
 | |
| 
 | |
|         while True:
 | |
|             self.start.set()
 | |
|             try:
 | |
|                 # We will block for a small while so that the consumers get
 | |
|                 # a chance to run and put some messages in the queue
 | |
|                 # TODO: This is a hack and will make the consumer block for
 | |
|                 # at least one second. Need to find a better way of doing this
 | |
|                 partition, message = self.queue.get(block=True, timeout=1)
 | |
|             except Empty:
 | |
|                 break
 | |
| 
 | |
|             # Count, check and commit messages if necessary
 | |
|             self.offsets[partition] = message.offset + 1
 | |
|             self.start.clear()
 | |
|             self.count_since_commit += 1
 | |
|             self._auto_commit()
 | |
|             yield message
 | |
| 
 | |
|         self.start.clear()
 | |
| 
 | |
|     def get_messages(self, count=1, block=True, timeout=10):
 | |
|         """
 | |
|         Fetch the specified number of messages
 | |
| 
 | |
|         count: Indicates the maximum number of messages to be fetched
 | |
|         block: If True, the API will block till some messages are fetched.
 | |
|         timeout: If block is True, the function will block for the specified
 | |
|                  time (in seconds) until count messages is fetched. If None,
 | |
|                  it will block forever.
 | |
|         """
 | |
|         messages = []
 | |
| 
 | |
|         # Give a size hint to the consumers. Each consumer process will fetch
 | |
|         # a maximum of "count" messages. This will fetch more messages than
 | |
|         # necessary, but these will not be committed to kafka. Also, the extra
 | |
|         # messages can be provided in subsequent runs
 | |
|         self.size.value = count
 | |
|         self.pause.clear()
 | |
| 
 | |
|         if timeout is not None:
 | |
|             max_time = time.time() + timeout
 | |
| 
 | |
|         new_offsets = {}
 | |
|         while count > 0 and (timeout is None or timeout > 0):
 | |
|             # Trigger consumption only if the queue is empty
 | |
|             # By doing this, we will ensure that consumers do not
 | |
|             # go into overdrive and keep consuming thousands of
 | |
|             # messages when the user might need only a few
 | |
|             if self.queue.empty():
 | |
|                 self.start.set()
 | |
| 
 | |
|             try:
 | |
|                 partition, message = self.queue.get(block, timeout)
 | |
|             except Empty:
 | |
|                 break
 | |
| 
 | |
|             messages.append(message)
 | |
|             new_offsets[partition] = message.offset + 1
 | |
|             count -= 1
 | |
|             if timeout is not None:
 | |
|                 timeout = max_time - time.time()
 | |
| 
 | |
|         self.size.value = 0
 | |
|         self.start.clear()
 | |
|         self.pause.set()
 | |
| 
 | |
|         # Update and commit offsets if necessary
 | |
|         self.offsets.update(new_offsets)
 | |
|         self.count_since_commit += len(messages)
 | |
|         self._auto_commit()
 | |
| 
 | |
|         return messages
 |