Corrected catch up aggregation logic
Fixed a bug where the hourly agregation would run at every iteration if the hour is zero (midnight) because zero is falsey. Change-Id: I9652f02aea30f3ddb6f154db716aa4057455be06
This commit is contained in:
parent
b7bec6f902
commit
00b4797a65
|
@ -38,56 +38,63 @@ class PreHourlyProcessorUtil(object):
|
||||||
return PreHourlyProcessorUtil.data_provider
|
return PreHourlyProcessorUtil.data_provider
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def is_time_to_run(check_time):
|
def is_time_to_run(check_date_time):
|
||||||
"""return True if its time to run this processor.
|
"""return True if its time to run this processor.
|
||||||
For now it just checks to see if its start of the hour
|
It is time to run the processor if:
|
||||||
i.e. the minute is 00.
|
The processor has no previous recorded run time.
|
||||||
|
It is more than the configured 'late_metric_slack_time' (to allow
|
||||||
|
for the arrival of tardy metrics) past the hour and the processor
|
||||||
|
has not yet run for this hour
|
||||||
"""
|
"""
|
||||||
this_hour = int(datetime.datetime.strftime(check_time, '%H'))
|
|
||||||
this_date = check_time.replace(minute=0, second=0,
|
check_hour = int(datetime.datetime.strftime(check_date_time, '%H'))
|
||||||
microsecond=0, hour=0)
|
check_date = check_date_time.replace(minute=0, second=0,
|
||||||
drift_delta = datetime.timedelta(
|
microsecond=0, hour=0)
|
||||||
|
slack = datetime.timedelta(
|
||||||
seconds=cfg.CONF.pre_hourly_processor.late_metric_slack_time)
|
seconds=cfg.CONF.pre_hourly_processor.late_metric_slack_time)
|
||||||
|
|
||||||
top_of_the_hour = check_time.replace(minute=0, second=0,
|
top_of_the_hour_date_time = check_date_time.replace(
|
||||||
microsecond=0)
|
minute=0, second=0, microsecond=0)
|
||||||
earliest_acceptable_run_time = top_of_the_hour + drift_delta
|
earliest_acceptable_run_date_time = top_of_the_hour_date_time + slack
|
||||||
last_processed = PreHourlyProcessorUtil.get_last_processed()
|
last_processed_date_time = PreHourlyProcessorUtil.get_last_processed()
|
||||||
if last_processed:
|
if last_processed_date_time:
|
||||||
hour_last_processed = int(
|
last_processed_hour = int(
|
||||||
datetime.datetime.strftime(
|
datetime.datetime.strftime(
|
||||||
last_processed, '%H'))
|
last_processed_date_time, '%H'))
|
||||||
date_last_processed = last_processed.replace(minute=0, second=0,
|
last_processed_date = last_processed_date_time.replace(
|
||||||
microsecond=0,
|
minute=0, second=0, microsecond=0, hour=0)
|
||||||
hour=0)
|
|
||||||
else:
|
else:
|
||||||
date_last_processed = None
|
last_processed_date = None
|
||||||
hour_last_processed = None
|
last_processed_hour = None
|
||||||
|
|
||||||
if this_hour == hour_last_processed:
|
if (check_hour == last_processed_hour
|
||||||
earliest_acceptable_run_time = (
|
and last_processed_date == check_date):
|
||||||
top_of_the_hour +
|
earliest_acceptable_run_date_time = (
|
||||||
|
top_of_the_hour_date_time +
|
||||||
datetime.timedelta(hours=1) +
|
datetime.timedelta(hours=1) +
|
||||||
drift_delta
|
slack
|
||||||
)
|
)
|
||||||
log.debug(
|
log.debug(
|
||||||
"Pre-hourly task check: Check time = %s, "
|
"Pre-hourly task check: Now date: %s, "
|
||||||
|
"Date last processed: %s, Check time = %s, "
|
||||||
"Last processed at %s (hour = %s), "
|
"Last processed at %s (hour = %s), "
|
||||||
"Earliest acceptable run time %s "
|
"Earliest acceptable run time %s "
|
||||||
"(based on configured pre hourly late metrics slack time of %s "
|
"(based on configured pre hourly late metrics slack time of %s "
|
||||||
"seconds)" % (
|
"seconds)" % (
|
||||||
check_time,
|
check_date,
|
||||||
last_processed,
|
last_processed_date,
|
||||||
hour_last_processed,
|
check_date_time,
|
||||||
earliest_acceptable_run_time,
|
last_processed_date_time,
|
||||||
|
last_processed_hour,
|
||||||
|
earliest_acceptable_run_date_time,
|
||||||
cfg.CONF.pre_hourly_processor.late_metric_slack_time
|
cfg.CONF.pre_hourly_processor.late_metric_slack_time
|
||||||
))
|
))
|
||||||
# run pre hourly processor only once from the
|
# run pre hourly processor only once from the
|
||||||
# configured time after the top of the hour
|
# configured time after the top of the hour
|
||||||
if (not hour_last_processed or (
|
if (not last_processed_date_time or (
|
||||||
((not this_hour == hour_last_processed) or
|
((not check_hour == last_processed_hour) or
|
||||||
(this_date > date_last_processed)) and
|
(check_date > last_processed_date)) and
|
||||||
check_time >= earliest_acceptable_run_time)):
|
check_date_time >= earliest_acceptable_run_date_time)):
|
||||||
log.debug("Pre-hourly: Yes, it's time to process")
|
log.debug("Pre-hourly: Yes, it's time to process")
|
||||||
return True
|
return True
|
||||||
log.debug("Pre-hourly: No, it's NOT time to process")
|
log.debug("Pre-hourly: No, it's NOT time to process")
|
||||||
|
|
|
@ -15,6 +15,12 @@ import datetime
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||||
|
|
||||||
|
ConfigInitializer.basic_config(
|
||||||
|
default_config_files=[
|
||||||
|
'tests/unit/test_resources/config/'
|
||||||
|
'test_config.conf']
|
||||||
|
)
|
||||||
from monasca_transform.processor.processor_util import PreHourlyProcessorUtil
|
from monasca_transform.processor.processor_util import PreHourlyProcessorUtil
|
||||||
from monasca_transform.processor.processor_util import ProcessUtilDataProvider
|
from monasca_transform.processor.processor_util import ProcessUtilDataProvider
|
||||||
|
|
||||||
|
@ -22,11 +28,7 @@ from monasca_transform.processor.processor_util import ProcessUtilDataProvider
|
||||||
class PreHourlyProcessorTest(unittest.TestCase):
|
class PreHourlyProcessorTest(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
ConfigInitializer.basic_config(
|
pass
|
||||||
default_config_files=[
|
|
||||||
'tests/unit/test_resources/config/'
|
|
||||||
'test_config.conf']
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_is_time_to_run_before_late_metric_slack_time(self):
|
def test_is_time_to_run_before_late_metric_slack_time(self):
|
||||||
check_time = datetime.datetime(
|
check_time = datetime.datetime(
|
||||||
|
@ -70,6 +72,15 @@ class PreHourlyProcessorTest(unittest.TestCase):
|
||||||
date_time=(check_time + datetime.timedelta(hours=-1)))
|
date_time=(check_time + datetime.timedelta(hours=-1)))
|
||||||
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||||
|
|
||||||
|
def test_after_midnight_having_already_run(
|
||||||
|
self):
|
||||||
|
check_time = datetime.datetime(
|
||||||
|
year=2016, month=11, day=7, hour=0,
|
||||||
|
minute=20, second=0, microsecond=1)
|
||||||
|
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||||
|
date_time=(check_time + datetime.timedelta(minutes=-10)))
|
||||||
|
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||||
|
|
||||||
def test_am_pm_behaviour(self):
|
def test_am_pm_behaviour(self):
|
||||||
check_time = datetime.datetime(
|
check_time = datetime.datetime(
|
||||||
year=2016, month=11, day=7, hour=22,
|
year=2016, month=11, day=7, hour=22,
|
||||||
|
|
Loading…
Reference in New Issue