diff --git a/monasca_transform/component/usage/calculate_rate.py b/monasca_transform/component/usage/calculate_rate.py index 5f13fb5..c771515 100644 --- a/monasca_transform/component/usage/calculate_rate.py +++ b/monasca_transform/component/usage/calculate_rate.py @@ -96,14 +96,22 @@ class CalculateRate(UsageComponent): ((latest_quantity - oldest_quantity) / oldest_quantity) * 100 # create a new instance usage dict - instance_usage_dict = {"tenant_id": "all", - "user_id": "all", - "resource_uuid": "all", - "geolocation": "all", - "region": "all", - "zone": "all", - "host": "all", - "project_id": "all", + instance_usage_dict = {"tenant_id": + latest_dict.get("tenant_id", "all"), + "user_id": + latest_dict.get("user_id", "all"), + "resource_uuid": + latest_dict.get("resource_uuid", "all"), + "geolocation": + latest_dict.get("geolocation", "all"), + "region": + latest_dict.get("region", "all"), + "zone": + latest_dict.get("zone", "all"), + "host": + latest_dict.get("host", "all"), + "project_id": + latest_dict.get("project_id", "all"), "aggregated_metric_name": aggregated_metric_name, "quantity": rate_percentage, @@ -118,9 +126,13 @@ class CalculateRate(UsageComponent): "record_count": oldest_dict["record_count"] + latest_dict["record_count"], "service_group": - Component.DEFAULT_UNAVAILABLE_VALUE, + latest_dict.get("service_group", + Component. + DEFAULT_UNAVAILABLE_VALUE), "service_id": - Component.DEFAULT_UNAVAILABLE_VALUE, + latest_dict.get("service_id", + Component. + DEFAULT_UNAVAILABLE_VALUE), "usage_date": latest_dict["usage_date"], "usage_hour": latest_dict["usage_hour"], "usage_minute": latest_dict["usage_minute"], diff --git a/monasca_transform/driver/mon_metrics_kafka.py b/monasca_transform/driver/mon_metrics_kafka.py index 249c458..b146ee1 100644 --- a/monasca_transform/driver/mon_metrics_kafka.py +++ b/monasca_transform/driver/mon_metrics_kafka.py @@ -44,6 +44,8 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \ from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor from monasca_transform.transform import RddTransformContext +from monasca_transform.transform.storage_utils import \ + InvalidCacheStorageLevelException from monasca_transform.transform.storage_utils import StorageUtils from monasca_transform.transform.transform_utils import MonMetricUtils from monasca_transform.transform import TransformContextUtils @@ -454,8 +456,14 @@ class MonMetricsKafkaProcessor(object): if cfg.CONF.service.enable_record_store_df_cache: storage_level_prop = \ cfg.CONF.service.record_store_df_cache_storage_level - storage_level = StorageUtils.get_storage_level( - storage_level_prop) + try: + storage_level = StorageUtils.get_storage_level( + storage_level_prop) + except InvalidCacheStorageLevelException as storage_error: + storage_error.value += \ + " (as specified in " \ + "service.record_store_df_cache_storage_level)" + raise record_store_df.persist(storage_level) # diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index e5bcb85..80a34ae 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -31,6 +31,8 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \ from monasca_transform.data_driven_specs.data_driven_specs_repo \ import DataDrivenSpecsRepoFactory from monasca_transform.processor import Processor +from monasca_transform.transform.storage_utils import \ + InvalidCacheStorageLevelException from monasca_transform.transform.storage_utils import StorageUtils from monasca_transform.transform.transform_utils import InstanceUsageUtils from monasca_transform.transform import TransformContextUtils @@ -416,8 +418,15 @@ class PreHourlyProcessor(Processor): storage_level_prop = \ cfg.CONF.pre_hourly_processor\ .instance_usage_df_cache_storage_level - storage_level = StorageUtils.get_storage_level( - storage_level_prop) + try: + storage_level = StorageUtils.get_storage_level( + storage_level_prop) + except InvalidCacheStorageLevelException as storage_error: + storage_error.value += \ + " (as specified in " \ + "pre_hourly_processor.instance_usage_df" \ + "_cache_storage_level)" + raise instance_usage_df.persist(storage_level) # aggregate pre hourly data diff --git a/monasca_transform/transform/storage_utils.py b/monasca_transform/transform/storage_utils.py index 34eb68f..2bfcab0 100644 --- a/monasca_transform/transform/storage_utils.py +++ b/monasca_transform/transform/storage_utils.py @@ -15,6 +15,19 @@ from pyspark import StorageLevel +class InvalidCacheStorageLevelException(Exception): + """Exception thrown when an invalid cache storage level is encountered + Attributes: + value: string representing the error + """ + + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + class StorageUtils(object): """storage util functions""" @@ -46,4 +59,5 @@ class StorageUtils(object): elif (storage_level_str == "OFF_HEAP"): return StorageLevel.OFF_HEAP else: - return StorageLevel.MEMORY_ONLY + raise InvalidCacheStorageLevelException( + "Unrecognized cache storage level: %s" % storage_level_str)