Added some bulletproofing to catch invalid configuration
entries for caching levels. Also changed the calculate_rate component to use values from instance usage if available (rather than using 'all'). Change-Id: Ibdbc8d57c2566de76051c9277f9c75225546d4d7
This commit is contained in:
parent
82fe3b9199
commit
1c3a7989e7
|
@ -96,14 +96,22 @@ class CalculateRate(UsageComponent):
|
|||
((latest_quantity - oldest_quantity) / oldest_quantity) * 100
|
||||
|
||||
# create a new instance usage dict
|
||||
instance_usage_dict = {"tenant_id": "all",
|
||||
"user_id": "all",
|
||||
"resource_uuid": "all",
|
||||
"geolocation": "all",
|
||||
"region": "all",
|
||||
"zone": "all",
|
||||
"host": "all",
|
||||
"project_id": "all",
|
||||
instance_usage_dict = {"tenant_id":
|
||||
latest_dict.get("tenant_id", "all"),
|
||||
"user_id":
|
||||
latest_dict.get("user_id", "all"),
|
||||
"resource_uuid":
|
||||
latest_dict.get("resource_uuid", "all"),
|
||||
"geolocation":
|
||||
latest_dict.get("geolocation", "all"),
|
||||
"region":
|
||||
latest_dict.get("region", "all"),
|
||||
"zone":
|
||||
latest_dict.get("zone", "all"),
|
||||
"host":
|
||||
latest_dict.get("host", "all"),
|
||||
"project_id":
|
||||
latest_dict.get("project_id", "all"),
|
||||
"aggregated_metric_name":
|
||||
aggregated_metric_name,
|
||||
"quantity": rate_percentage,
|
||||
|
@ -118,9 +126,13 @@ class CalculateRate(UsageComponent):
|
|||
"record_count": oldest_dict["record_count"] +
|
||||
latest_dict["record_count"],
|
||||
"service_group":
|
||||
Component.DEFAULT_UNAVAILABLE_VALUE,
|
||||
latest_dict.get("service_group",
|
||||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"service_id":
|
||||
Component.DEFAULT_UNAVAILABLE_VALUE,
|
||||
latest_dict.get("service_id",
|
||||
Component.
|
||||
DEFAULT_UNAVAILABLE_VALUE),
|
||||
"usage_date": latest_dict["usage_date"],
|
||||
"usage_hour": latest_dict["usage_hour"],
|
||||
"usage_minute": latest_dict["usage_minute"],
|
||||
|
|
|
@ -44,6 +44,8 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
|||
from monasca_transform.processor.pre_hourly_processor import PreHourlyProcessor
|
||||
|
||||
from monasca_transform.transform import RddTransformContext
|
||||
from monasca_transform.transform.storage_utils import \
|
||||
InvalidCacheStorageLevelException
|
||||
from monasca_transform.transform.storage_utils import StorageUtils
|
||||
from monasca_transform.transform.transform_utils import MonMetricUtils
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
@ -454,8 +456,14 @@ class MonMetricsKafkaProcessor(object):
|
|||
if cfg.CONF.service.enable_record_store_df_cache:
|
||||
storage_level_prop = \
|
||||
cfg.CONF.service.record_store_df_cache_storage_level
|
||||
storage_level = StorageUtils.get_storage_level(
|
||||
storage_level_prop)
|
||||
try:
|
||||
storage_level = StorageUtils.get_storage_level(
|
||||
storage_level_prop)
|
||||
except InvalidCacheStorageLevelException as storage_error:
|
||||
storage_error.value += \
|
||||
" (as specified in " \
|
||||
"service.record_store_df_cache_storage_level)"
|
||||
raise
|
||||
record_store_df.persist(storage_level)
|
||||
|
||||
#
|
||||
|
|
|
@ -31,6 +31,8 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
|||
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
||||
import DataDrivenSpecsRepoFactory
|
||||
from monasca_transform.processor import Processor
|
||||
from monasca_transform.transform.storage_utils import \
|
||||
InvalidCacheStorageLevelException
|
||||
from monasca_transform.transform.storage_utils import StorageUtils
|
||||
from monasca_transform.transform.transform_utils import InstanceUsageUtils
|
||||
from monasca_transform.transform import TransformContextUtils
|
||||
|
@ -416,8 +418,15 @@ class PreHourlyProcessor(Processor):
|
|||
storage_level_prop = \
|
||||
cfg.CONF.pre_hourly_processor\
|
||||
.instance_usage_df_cache_storage_level
|
||||
storage_level = StorageUtils.get_storage_level(
|
||||
storage_level_prop)
|
||||
try:
|
||||
storage_level = StorageUtils.get_storage_level(
|
||||
storage_level_prop)
|
||||
except InvalidCacheStorageLevelException as storage_error:
|
||||
storage_error.value += \
|
||||
" (as specified in " \
|
||||
"pre_hourly_processor.instance_usage_df" \
|
||||
"_cache_storage_level)"
|
||||
raise
|
||||
instance_usage_df.persist(storage_level)
|
||||
|
||||
# aggregate pre hourly data
|
||||
|
|
|
@ -15,6 +15,19 @@
|
|||
from pyspark import StorageLevel
|
||||
|
||||
|
||||
class InvalidCacheStorageLevelException(Exception):
|
||||
"""Exception thrown when an invalid cache storage level is encountered
|
||||
Attributes:
|
||||
value: string representing the error
|
||||
"""
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
|
||||
|
||||
class StorageUtils(object):
|
||||
"""storage util functions"""
|
||||
|
||||
|
@ -46,4 +59,5 @@ class StorageUtils(object):
|
|||
elif (storage_level_str == "OFF_HEAP"):
|
||||
return StorageLevel.OFF_HEAP
|
||||
else:
|
||||
return StorageLevel.MEMORY_ONLY
|
||||
raise InvalidCacheStorageLevelException(
|
||||
"Unrecognized cache storage level: %s" % storage_level_str)
|
||||
|
|
Loading…
Reference in New Issue