Merge "Corrected catch up aggregation logic"
This commit is contained in:
commit
e6f7057786
@ -38,56 +38,63 @@ class PreHourlyProcessorUtil(object):
|
||||
return PreHourlyProcessorUtil.data_provider
|
||||
|
||||
@staticmethod
|
||||
def is_time_to_run(check_time):
|
||||
def is_time_to_run(check_date_time):
|
||||
"""return True if its time to run this processor.
|
||||
For now it just checks to see if its start of the hour
|
||||
i.e. the minute is 00.
|
||||
It is time to run the processor if:
|
||||
The processor has no previous recorded run time.
|
||||
It is more than the configured 'late_metric_slack_time' (to allow
|
||||
for the arrival of tardy metrics) past the hour and the processor
|
||||
has not yet run for this hour
|
||||
"""
|
||||
this_hour = int(datetime.datetime.strftime(check_time, '%H'))
|
||||
this_date = check_time.replace(minute=0, second=0,
|
||||
microsecond=0, hour=0)
|
||||
drift_delta = datetime.timedelta(
|
||||
|
||||
check_hour = int(datetime.datetime.strftime(check_date_time, '%H'))
|
||||
check_date = check_date_time.replace(minute=0, second=0,
|
||||
microsecond=0, hour=0)
|
||||
slack = datetime.timedelta(
|
||||
seconds=cfg.CONF.pre_hourly_processor.late_metric_slack_time)
|
||||
|
||||
top_of_the_hour = check_time.replace(minute=0, second=0,
|
||||
microsecond=0)
|
||||
earliest_acceptable_run_time = top_of_the_hour + drift_delta
|
||||
last_processed = PreHourlyProcessorUtil.get_last_processed()
|
||||
if last_processed:
|
||||
hour_last_processed = int(
|
||||
top_of_the_hour_date_time = check_date_time.replace(
|
||||
minute=0, second=0, microsecond=0)
|
||||
earliest_acceptable_run_date_time = top_of_the_hour_date_time + slack
|
||||
last_processed_date_time = PreHourlyProcessorUtil.get_last_processed()
|
||||
if last_processed_date_time:
|
||||
last_processed_hour = int(
|
||||
datetime.datetime.strftime(
|
||||
last_processed, '%H'))
|
||||
date_last_processed = last_processed.replace(minute=0, second=0,
|
||||
microsecond=0,
|
||||
hour=0)
|
||||
last_processed_date_time, '%H'))
|
||||
last_processed_date = last_processed_date_time.replace(
|
||||
minute=0, second=0, microsecond=0, hour=0)
|
||||
else:
|
||||
date_last_processed = None
|
||||
hour_last_processed = None
|
||||
last_processed_date = None
|
||||
last_processed_hour = None
|
||||
|
||||
if this_hour == hour_last_processed:
|
||||
earliest_acceptable_run_time = (
|
||||
top_of_the_hour +
|
||||
if (check_hour == last_processed_hour
|
||||
and last_processed_date == check_date):
|
||||
earliest_acceptable_run_date_time = (
|
||||
top_of_the_hour_date_time +
|
||||
datetime.timedelta(hours=1) +
|
||||
drift_delta
|
||||
slack
|
||||
)
|
||||
log.debug(
|
||||
"Pre-hourly task check: Check time = %s, "
|
||||
"Pre-hourly task check: Now date: %s, "
|
||||
"Date last processed: %s, Check time = %s, "
|
||||
"Last processed at %s (hour = %s), "
|
||||
"Earliest acceptable run time %s "
|
||||
"(based on configured pre hourly late metrics slack time of %s "
|
||||
"seconds)" % (
|
||||
check_time,
|
||||
last_processed,
|
||||
hour_last_processed,
|
||||
earliest_acceptable_run_time,
|
||||
check_date,
|
||||
last_processed_date,
|
||||
check_date_time,
|
||||
last_processed_date_time,
|
||||
last_processed_hour,
|
||||
earliest_acceptable_run_date_time,
|
||||
cfg.CONF.pre_hourly_processor.late_metric_slack_time
|
||||
))
|
||||
# run pre hourly processor only once from the
|
||||
# configured time after the top of the hour
|
||||
if (not hour_last_processed or (
|
||||
((not this_hour == hour_last_processed) or
|
||||
(this_date > date_last_processed)) and
|
||||
check_time >= earliest_acceptable_run_time)):
|
||||
if (not last_processed_date_time or (
|
||||
((not check_hour == last_processed_hour) or
|
||||
(check_date > last_processed_date)) and
|
||||
check_date_time >= earliest_acceptable_run_date_time)):
|
||||
log.debug("Pre-hourly: Yes, it's time to process")
|
||||
return True
|
||||
log.debug("Pre-hourly: No, it's NOT time to process")
|
||||
|
@ -15,6 +15,12 @@ import datetime
|
||||
import unittest
|
||||
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
|
||||
ConfigInitializer.basic_config(
|
||||
default_config_files=[
|
||||
'tests/unit/test_resources/config/'
|
||||
'test_config.conf']
|
||||
)
|
||||
from monasca_transform.processor.processor_util import PreHourlyProcessorUtil
|
||||
from monasca_transform.processor.processor_util import ProcessUtilDataProvider
|
||||
|
||||
@ -22,11 +28,7 @@ from monasca_transform.processor.processor_util import ProcessUtilDataProvider
|
||||
class PreHourlyProcessorTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
ConfigInitializer.basic_config(
|
||||
default_config_files=[
|
||||
'tests/unit/test_resources/config/'
|
||||
'test_config.conf']
|
||||
)
|
||||
pass
|
||||
|
||||
def test_is_time_to_run_before_late_metric_slack_time(self):
|
||||
check_time = datetime.datetime(
|
||||
@ -70,6 +72,15 @@ class PreHourlyProcessorTest(unittest.TestCase):
|
||||
date_time=(check_time + datetime.timedelta(hours=-1)))
|
||||
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
def test_after_midnight_having_already_run(
|
||||
self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=0,
|
||||
minute=20, second=0, microsecond=1)
|
||||
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
|
||||
date_time=(check_time + datetime.timedelta(minutes=-10)))
|
||||
self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
|
||||
|
||||
def test_am_pm_behaviour(self):
|
||||
check_time = datetime.datetime(
|
||||
year=2016, month=11, day=7, hour=22,
|
||||
|
Loading…
Reference in New Issue
Block a user