diff --git a/devstack/files/monasca-transform/monasca-transform.conf b/devstack/files/monasca-transform/monasca-transform.conf index b585b95..749d1b5 100644 --- a/devstack/files/monasca-transform/monasca-transform.conf +++ b/devstack/files/monasca-transform/monasca-transform.conf @@ -26,6 +26,7 @@ pre_hourly_processor_enabled = True [pre_hourly_processor] enable_instance_usage_df_cache = True instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 +enable_batch_time_filtering = True # # Configurable values for the monasca-transform service diff --git a/etc/monasca-transform.conf b/etc/monasca-transform.conf index 9a5dd6c..72a2084 100644 --- a/etc/monasca-transform.conf +++ b/etc/monasca-transform.conf @@ -26,6 +26,7 @@ enable_pre_hourly_processor = True [pre_hourly_processor] enable_instance_usage_df_cache = True instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 +enable_batch_time_filtering = True # # Configurable values for the monasca-transform service diff --git a/monasca_transform/config/config_initializer.py b/monasca_transform/config/config_initializer.py index 9bc9aa4..328c1ab 100644 --- a/monasca_transform/config/config_initializer.py +++ b/monasca_transform/config/config_initializer.py @@ -135,7 +135,8 @@ class ConfigInitializer(object): def load_pre_hourly_processor_options(): app_opts = [ cfg.BoolOpt('enable_instance_usage_df_cache'), - cfg.StrOpt('instance_usage_df_cache_storage_level') + cfg.StrOpt('instance_usage_df_cache_storage_level'), + cfg.BoolOpt('enable_batch_time_filtering') ] app_group = cfg.OptGroup(name='pre_hourly_processor', title='pre_hourly_processor') diff --git a/monasca_transform/mysql_offset_specs.py b/monasca_transform/mysql_offset_specs.py index 7a42f9d..ef7d925 100644 --- a/monasca_transform/mysql_offset_specs.py +++ b/monasca_transform/mysql_offset_specs.py @@ -88,6 +88,22 @@ class MySQLOffsetSpecs(OffsetSpecs): MySQLOffsetSpec.app_name == app_name, MySQLOffsetSpec.revision == 1).all()} + def get_most_recent_batch_time_from_offsets(self, app_name, topic): + try: + # get partition 0 as a representative of all others + offset = self.session.query(MySQLOffsetSpec).filter( + MySQLOffsetSpec.app_name == app_name, + MySQLOffsetSpec.topic == topic, + MySQLOffsetSpec.partition == 0, + MySQLOffsetSpec.revision == 1).one() + most_recent_batch_time = datetime.datetime.strptime( + offset.get_batch_time(), + '%Y-%m-%d %H:%M:%S') + except Exception: + most_recent_batch_time = None + + return most_recent_batch_time + def delete_all_kafka_offsets(self, app_name): try: self.session.query(MySQLOffsetSpec).filter( diff --git a/monasca_transform/offset_specs.py b/monasca_transform/offset_specs.py index e7983f6..b1d3b8a 100644 --- a/monasca_transform/offset_specs.py +++ b/monasca_transform/offset_specs.py @@ -98,6 +98,13 @@ class OffsetSpecs(object): "Class %s doesn't implement delete_all_kafka_offsets()" % self.__class__.__name__) + @abc.abstractmethod + def get_most_recent_batch_time_from_offsets(self, app_name, topic): + raise NotImplementedError( + "Class %s doesn't implement " + "get_most_recent_batch_time_from_offsets()" + % self.__class__.__name__) + class JSONOffsetSpecs(OffsetSpecs): @@ -190,6 +197,19 @@ class JSONOffsetSpecs(OffsetSpecs): def get_kafka_offsets(self, app_name): return self._kafka_offsets + def get_most_recent_batch_time_from_offsets(self, app_name, topic): + try: + # get partition 0 as a representative of all others + key = "%s_%s_%s" % (app_name, topic, 0) + offset = self._kafka_offsets[key] + most_recent_batch_time = datetime.datetime.strptime( + offset.get_batch_time(), + '%Y-%m-%d %H:%M:%S') + except Exception: + most_recent_batch_time = None + + return most_recent_batch_time + def delete_all_kafka_offsets(self, app_name): log.info("Deleting json offsets file: %s", self.kafka_offset_spec_file) os.remove(self.kafka_offset_spec_file) diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index e9fa3c8..60a3d1b 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -244,7 +244,7 @@ class PreHourlyProcessor(Processor): available in kafka. """ - offset_specifications = simport.load(cfg.CONF.repositories.offsets)() + offset_specifications = PreHourlyProcessor.get_offset_specs() # get application name, will be used to get offsets from database app_name = PreHourlyProcessor.get_app_name() @@ -277,6 +277,12 @@ class PreHourlyProcessor(Processor): saved_offset_spec) return offset_range_list + @staticmethod + def get_offset_specs(): + """get offset specifications. + """ + return simport.load(cfg.CONF.repositories.offsets)() + @staticmethod def fetch_pre_hourly_data(spark_context, offset_range_list): @@ -305,6 +311,45 @@ class PreHourlyProcessor(Processor): instance_usage_df = InstanceUsageUtils.create_df_from_json_rdd( sqlc, instance_usage_rdd) + if cfg.CONF.pre_hourly_processor.enable_batch_time_filtering: + instance_usage_df = ( + PreHourlyProcessor.filter_out_records_not_in_current_batch( + instance_usage_df)) + + return instance_usage_df + + @staticmethod + def filter_out_records_not_in_current_batch(instance_usage_df): + """Filter out any records which don't pertain to the + current batch (i.e., records before or after the + batch currently being processed). + """ + # get the most recent batch time from the stored offsets + + offset_specifications = PreHourlyProcessor.get_offset_specs() + app_name = PreHourlyProcessor.get_app_name() + topic = PreHourlyProcessor.get_kafka_topic() + most_recent_batch_time = ( + offset_specifications.get_most_recent_batch_time_from_offsets( + app_name, topic)) + + if most_recent_batch_time: + # filter out records before current batch + instance_usage_df = instance_usage_df.filter( + instance_usage_df.lastrecord_timestamp_string >= + most_recent_batch_time) + + # determine the timestamp of the most recent top-of-the-hour (which + # is the end of the current batch). + current_time = datetime.datetime.now() + truncated_timestamp_to_current_hour = current_time.replace( + minute=0, second=0, microsecond=0) + + # filter out records after current batch + instance_usage_df = instance_usage_df.filter( + instance_usage_df.firstrecord_timestamp_string < + truncated_timestamp_to_current_hour) + return instance_usage_df @staticmethod diff --git a/tests/unit/processor/test_pre_hourly_processor_agg.py b/tests/unit/processor/test_pre_hourly_processor_agg.py index 0639747..b9a8e10 100644 --- a/tests/unit/processor/test_pre_hourly_processor_agg.py +++ b/tests/unit/processor/test_pre_hourly_processor_agg.py @@ -12,7 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. import mock +import os +import random +import sys import unittest +import uuid from oslo_config import cfg from pyspark.streaming.kafka import OffsetRange @@ -26,9 +30,13 @@ from tests.unit.spark_context_test import SparkContextTest from tests.unit.test_resources.metrics_pre_hourly_data.data_provider \ import DataProvider +from monasca_transform.offset_specs import JSONOffsetSpecs + class TestPreHourlyProcessorAgg(SparkContextTest): + test_resources_path = 'tests/unit/test_resources' + def setUp(self): super(TestPreHourlyProcessorAgg, self).setUp() # configure the system with a dummy messaging adapter @@ -43,19 +51,44 @@ class TestPreHourlyProcessorAgg(SparkContextTest): @mock.patch('monasca_transform.processor.pre_hourly_processor.KafkaInsert', DummyInsert) + @mock.patch('monasca_transform.processor.pre_hourly_processor.' + 'PreHourlyProcessor.get_offset_specs') @mock.patch('monasca_transform.processor.pre_hourly_processor.' 'PreHourlyProcessor.fetch_pre_hourly_data') @mock.patch('monasca_transform.processor.pre_hourly_processor.' 'PreHourlyProcessor.get_processing_offset_range_list') def test_pre_hourly_processor(self, offset_range_list, - pre_hourly_data): + pre_hourly_data, + offset_specs): # load components myOffsetRanges = [ - OffsetRange("metrics_pre_hourly", 1, 10, 20)] + OffsetRange("metrics_pre_hourly", 0, 10, 20)] offset_range_list.return_value = myOffsetRanges + filename = '%s.json' % str(uuid.uuid4()) + file_path = os.path.join(self.test_resources_path, filename) + json_offset_specs = JSONOffsetSpecs( + path=self.test_resources_path, + filename=filename + ) + app_name = "mon_metrics_kafka_pre_hourly" + topic = "metrics_pre_hourly" + partition = 0 + until_offset = random.randint(0, sys.maxsize) + from_offset = random.randint(0, sys.maxsize) + + my_batch_time = self.get_dummy_batch_time() + + json_offset_specs.add(topic=topic, partition=partition, + app_name=app_name, + from_offset=from_offset, + until_offset=until_offset, + batch_time_info=my_batch_time) + + offset_specs.return_value = json_offset_specs + # Create an RDD out of the mocked instance usage data with open(DataProvider.metrics_pre_hourly_data_path) as f: raw_lines = f.read().splitlines() @@ -167,13 +200,14 @@ class TestPreHourlyProcessorAgg(SparkContextTest): swift_disk_rate_agg_metric.get('metric') .get('dimensions').get('aggregation_period')) + os.remove(file_path) + def simple_count_transform(self, rdd): return rdd.count() if __name__ == "__main__": print("PATH *************************************************************") - import sys print(sys.path) print("PATH==============================================================") unittest.main() diff --git a/tests/unit/test_json_kafka_offsets.py b/tests/unit/test_json_kafka_offsets.py index 81b1c60..818dff5 100644 --- a/tests/unit/test_json_kafka_offsets.py +++ b/tests/unit/test_json_kafka_offsets.py @@ -234,6 +234,36 @@ class TestJSONOffsetSpecs(unittest.TestCase): offset_value_updated.get('until_offset')) os.remove(file_path) + def test_get_most_recent_batch_time(self): + filename = '%s.json' % str(uuid.uuid4()) + file_path = os.path.join(self.test_resources_path, filename) + json_offset_specs = JSONOffsetSpecs( + path=self.test_resources_path, + filename=filename + ) + app_name = "mon_metrics_kafka" + + topic_1 = str(uuid.uuid4()) + partition_1 = 0 + until_offset_1 = random.randint(0, sys.maxsize) + from_offset_1 = random.randint(0, sys.maxsize) + + my_batch_time = self.get_dummy_batch_time() + + json_offset_specs.add(topic=topic_1, partition=partition_1, + app_name=app_name, + from_offset=from_offset_1, + until_offset=until_offset_1, + batch_time_info=my_batch_time) + + most_recent_batch_time = ( + json_offset_specs.get_most_recent_batch_time_from_offsets( + app_name, topic_1)) + + self.assertEqual(most_recent_batch_time, my_batch_time) + + os.remove(file_path) + def load_offset_file_as_json(self, file_path): with open(file_path, 'r') as f: json_file = json.load(f) diff --git a/tests/unit/test_mysql_kafka_offsets.py b/tests/unit/test_mysql_kafka_offsets.py index ab78716..25ab579 100644 --- a/tests/unit/test_mysql_kafka_offsets.py +++ b/tests/unit/test_mysql_kafka_offsets.py @@ -131,6 +131,27 @@ class TestMySQLOffsetSpecs(unittest.TestCase): self.assertEqual(until_offset_2, updated_offset_value.get_until_offset()) + def test_get_most_recent_batch_time(self): + topic_1 = str(uuid.uuid4()) + partition_1 = 0 + until_offset_1 = random.randint(0, sys.maxsize) + from_offset_1 = random.randint(0, sys.maxsize) + app_name_1 = str(uuid.uuid4()) + + my_batch_time = self.get_dummy_batch_time() + + self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, + app_name=app_name_1, + from_offset=from_offset_1, + until_offset=until_offset_1, + batch_time_info=my_batch_time) + + most_recent_batch_time = ( + self.kafka_offset_specs.get_most_recent_batch_time_from_offsets( + app_name_1, topic_1)) + + self.assertEqual(most_recent_batch_time, my_batch_time) + def assertions_on_offset(self, used_value=None, offset_value=None): self.assertEqual(used_value.get('topic'), offset_value.get_topic()) diff --git a/tests/unit/test_resources/config/test_config.conf b/tests/unit/test_resources/config/test_config.conf index f95c5b0..f1c0f37 100644 --- a/tests/unit/test_resources/config/test_config.conf +++ b/tests/unit/test_resources/config/test_config.conf @@ -19,6 +19,7 @@ enable_pre_hourly_processor = False [pre_hourly_processor] enable_instance_usage_df_cache = False instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 +enable_batch_time_filtering = True [service] enable_record_store_df_cache = False diff --git a/tests/unit/test_resources/config/test_config_with_dummy_messaging_adapter.conf b/tests/unit/test_resources/config/test_config_with_dummy_messaging_adapter.conf index 6ff69a5..610eca6 100644 --- a/tests/unit/test_resources/config/test_config_with_dummy_messaging_adapter.conf +++ b/tests/unit/test_resources/config/test_config_with_dummy_messaging_adapter.conf @@ -9,6 +9,7 @@ enable_pre_hourly_processor = False [pre_hourly_processor] enable_instance_usage_df_cache = False instance_usage_df_cache_storage_level = MEMORY_ONLY_SER_2 +enable_batch_time_filtering = True [service] enable_record_store_df_cache = False