Delete hourly offsets from offsets table
Pre Hourly processor fails if offsets recorded in kafka_offsets table no longer exist in kafka. This change deletes the offsets from kafka_offsets table, so that the pre hourly processor can resume processing with the next run. Change-Id: I017c271e630fdf6de05a73b3bfcb14f5ed18615f
This commit is contained in:
parent
d725b41817
commit
c189feeb8b
|
@ -579,5 +579,9 @@ def invoke():
|
|||
|
||||
MonMetricsKafkaProcessor.reset_kafka_offsets(application_name)
|
||||
|
||||
# delete pre hourly processor offsets
|
||||
if cfg.CONF.stage_processors.pre_hourly_processor_enabled:
|
||||
PreHourlyProcessor.reset_kafka_offsets()
|
||||
|
||||
if __name__ == "__main__":
|
||||
invoke()
|
||||
|
|
|
@ -84,8 +84,11 @@ class PreHourlyProcessor(Processor):
|
|||
batch_time_info)
|
||||
|
||||
@staticmethod
|
||||
def reset_kafka_offsets(app_name):
|
||||
def reset_kafka_offsets():
|
||||
"""delete all offsets from the offset specification."""
|
||||
|
||||
app_name = PreHourlyProcessor.get_app_name()
|
||||
|
||||
# get the offsets from global var
|
||||
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
|
||||
offset_specs.delete_all_kafka_offsets(app_name)
|
||||
|
|
Loading…
Reference in New Issue