diff --git a/monasca_notification/processors/kafka_consumer.py b/monasca_notification/processors/kafka_consumer.py index d2946a8..a2be697 100644 --- a/monasca_notification/processors/kafka_consumer.py +++ b/monasca_notification/processors/kafka_consumer.py @@ -113,14 +113,30 @@ class KafkaConsumer(BaseProcessor): self._consumer.fetch_offsets = partitioned_fetch_offsets - for message in self._consumer: - if not set_partitioner.acquired: - break - consumed_from_kafka += 1 - log.debug("Consuming message from kafka, " - "partition {}, offset {}".format(message[0], - message[1].offset)) - yield message + # When Kafka resizes the partitions it's possible that it + # will remove data at our current offset. When this + # happens the next attempt to read from Kafka will generate + # an OffsetOutOfRangeError. We trap this error and seek to + # the head of the current Kafka data. Because this error + # only happens when Kafka removes data we're currently + # pointing at we're gauranteed that we won't read any + # duplicate data however we will lose any information + # between our current offset and the new Kafka head. + + try: + for message in self._consumer: + if not set_partitioner.acquired: + break + consumed_from_kafka += 1 + + log.debug("Consuming message from kafka, " + "partition {}, offset {}". + format(message[0], message[1].offset)) + + yield message + except kafka.common.OffsetOutOfRangeError: + log.error("Kafka OffsetOutOfRange. Jumping to head.") + self._consumer.seek(0, 0) elif set_partitioner.allocating: log.info("Waiting to acquire locks on partition set")