21 KiB
Team and repository tags
Monasca Transform Generic Aggregation Components
Introduction
Monasca Transform uses standard ETL (Extract-Transform-Load) design pattern to aggregate monasca metrics and uses innovative data/configuration driven mechanism to drive processing. It accomplishes data aggregation in two distinct steps, each is driven using external configuration specifications, namely pre-transform_spec and transform_spec.
1: Conversion of incoming metrics to record store data format
In the first step, the incoming metrics are converted into a canonical data format called as record store data using pre_transform_spec.
This logical processing data flow is explained in more detail in Monasca/Transform wiki: Logical processing data flow section: Conversion to record store format and includes following operations:
-
identifying metrics that are required (or in other words filtering out of unwanted metrics)
-
validation and extraction of essential data in metric
-
generating multiple records for incoming metrics if they are to be aggregated in multiple ways, and finally
-
conversion of the incoming metrics to canonical record store data format. Please refer to record store section in Data Formats for more information on record store format.
Pre Transform Spec
Example pre_transform_spec for metric
{
"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},
"event_type":"cpu.total_logical_cores",
"metric_id_list":["cpu_total_all","cpu_total_host","cpu_util_all","cpu_util_host"],
"required_raw_fields_list":["creation_time"],
"service_id":"host_metrics"
}
List of fields
field name | values | description |
---|---|---|
event_processing_params | Set default field values set_default_zone_to , set_default_geolocation_to , set_default_region_to |
Set default values for certain fields in the record store data |
event_type | Name of the metric | identifies metric that needs to be aggregated |
metric_id_list | List of metric_id 's |
List of identifiers, should match metric_id in transform specs. This is used by record generation step to generate multiple records if this metric is to be aggregated in multiple ways |
required_raw_fields_list | List of field |
List of fields (use JSON dotted notation) that are required in the incoming metric, used for validating incoming metric |
service_id | service identifier | Identifies the service to which this metric belongs to. Note: this field not yet used |
2: Data aggregation using generic aggregation components
In the second step, the canonical record store data is aggregated using transform_spec. Each
transform_spec defines series of generic aggregation components, which are specified in
aggregation_params_map.aggregation_pipeline
section. (See transform_spec example below).
Any parameters used by the generic aggregation components are also specified in the
aggregation_params_map
section (See Other parameters e.g. aggregated_metric_name
, aggregation_period
,
aggregation_group_by_list
etc. in transform_spec example below)
Transform Specs
Example transform_spec for metric
{"aggregation_params_map":{
"aggregation_pipeline":{
"source":"streaming",
"usage":"fetch_quantity",
"setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],
"insert":["prepare_data","insert_data_pre_hourly"]
},
"aggregated_metric_name":"cpu.total_logical_cores_agg",
"aggregation_period":"hourly",
"aggregation_group_by_list": ["host", "metric_id", "tenant_id"],
"usage_fetch_operation": "avg",
"filter_by_list": [],
"setter_rollup_group_by_list": [],
"setter_rollup_operation": "sum",
"dimension_list":["aggregation_period","host","project_id"],
"pre_hourly_operation":"avg",
"pre_hourly_group_by_list":["default"]
},
"metric_group":"cpu_total_all",
"metric_id":"cpu_total_all"
}
aggregation_params_map
This section specifies aggregation_pipeline, Other parameters (used by generic aggregation components in aggregation_pipeline).
aggregation_pipeline
Specifies generic aggregation components that should be used to process incoming metrics.
Note: generic aggregation components are re-usable and can be used to build different aggregation pipelines as required.
List of fields
field name | values | description |
---|---|---|
source | streaming |
source is streaming . In the future this can be used to specify a component which can fetch data directly from monasca datastore |
usage | fetch_quantity , fetch_quantity_util , calculate_rate |
Usage Components |
setters | pre_hourly_calculate_rate , rollup_quantity , set_aggregated_metric_name , set_aggregated_period |
Setter Components |
insert | insert_data , insert_data_pre_hourly |
Insert Components |
Other parameters
Specifies parameters that generic aggregation components use to process and aggregate data.
List of Other parameters
Parameter Name | Values | Description | Used by |
---|---|---|---|
aggregated_metric_name | e.g. "cpu.total_logical_cores_agg" | Name of the aggregated metric | set_aggregated_metric_name |
aggregation_period | "hourly", "minutely" or "secondly" | Period over which to aggregate data. | fetch_quantity, fetch_quantity_util, calculate_rate, set_aggregated_period, rollup_quantity |
aggregation_group_by_list | e.g. "project_id", "hostname" | Group record_store data with these columns |
fetch_quantity, fetch_quantity_util, calculate_rate |
usage_fetch_operation | e.g "sum" | After the data is grouped by aggregation_group_by_list , perform this operation to find the aggregated metric value |
fetch_quantity, fetch_quantity_util, calculate_rate |
filter_by_list | Filter regex | Filter data using regex on a record_store column value |
fetch_quantity, fetch_quantity_util, calculate_rate |
setter_rollup_group_by_list | e.g. "project_id" | Group by these set of fields | rollup_quantity |
setter_rollup_operation | e.g. "avg" | After data is grouped by setter_rollup_group_by_list , peform this operation to find aggregated metric value |
rollup_quantity |
dimension_list | e.g. "aggregation_period", "host", "project_id" | List of fields which specify dimensions in aggregated metric | insert_data, insert_data_pre_hourly |
pre_hourly_group_by_list | e.g. "default" | List of instance usage data fields to do a group by operation to aggregate data |
pre_hourly_processor |
pre_hourly_operation | e.g. "avg" | When aggregating data published to metrics_pre_hourly every hour, perform this operation to find hourly aggregated metric value |
pre_hourly_processor |
metric_group and metric_id
Specifies a metric or list of metrics from the record store data, which will be processed by this transform_spec. Note: This can be a single metric or a group of metrics that will be combined to produce the final aggregated metric.
List of fields
field name | values | description |
---|---|---|
metric_group | unique transform spec group identifier | group identifier for this transform spec e.g. "cpu_total_all" |
metric_id | unique transform spec identifier | identifier for this transform spec e.g. "cpu_total_all" |
Note: "metric_id" is a misnomer, it is not really a metric group/or metric identifier but rather identifier for transformation spec. This will be changed to "transform_spec_id" in the future.
Generic Aggregation Components
List of Generic Aggregation Components
Usage Components
All usage components implement a method
def usage(transform_context, record_store_df):
..
..
return instance_usage_df
fetch_quantity
This component groups record store records by aggregation_group_by_list
, sorts within
group by timestamp field, finds usage based on usage_fetch_operation
. Optionally this
component also takes filter_by_list
to include for exclude certain records from usage
calculation.
Other parameters
-
aggregation_group_by_list
List of fields to group by.
Possible values: any set of fields in record store data.
Example:
"aggregation_group_by_list": ["tenant_id"]
-
usage_fetch_operation
Operation to be performed on grouped data set.
Possible values: "sum", "max", "min", "avg", "latest", "oldest"
-
aggregation_period
Period to aggregate by.
Possible values: 'daily', 'hourly', 'minutely', 'secondly'.
Example:
"aggregation_period": "hourly"
-
filter_by_list
Filter (include or exclude) record store data as specified.
Example:
filter_by_list": "[{"field_to_filter": "hostname", "filter_expression": "comp-(\d)+", "filter_operation": "include"}]
OR
filter_by_list": "[{"field_to_filter": "hostname", "filter_expression": "controller-(\d)+", "filter_operation": "exclude"}]
fetch_quantity_util
This component finds the utilized quantity based on total_quantity and idle_perc using following calculation
utilized_quantity = (100 - idle_perc) * total_quantity / 100
where,
-
total_quantity data, identified by
usage_fetch_util_quantity_event_type
parameter and -
idle_perc data, identified by
usage_fetch_util_idle_perc_event_type
parameter
This component initially groups record store records by aggregation_group_by_list
and
event_type
, sorts within group by timestamp field, calculates total_quantity
and
idle_perc
values based on usage_fetch_operation
. utilized_quantity
is then calculated
using the formula given above.
Other parameters
-
aggregation_group_by_list
List of fields to group by.
Possible values: any set of fields in record store data.
Example:
"aggregation_group_by_list": ["tenant_id"]
-
usage_fetch_operation
Operation to be performed on grouped data set
Possible values: "sum", "max", "min", "avg", "latest", "oldest"
-
aggregation_period
Period to aggregate by.
Possible values: 'daily', 'hourly', 'minutely', 'secondly'.
Example:
"aggregation_period": "hourly"
-
filter_by_list
Filter (include or exclude) record store data as specified
Example:
filter_by_list": "[{"field_to_filter": "hostname", "filter_expression": "comp-(\d)+", "filter_operation": "include"}]
OR
filter_by_list": "[{"field_to_filter": "hostname", "filter_expression": "controller-(\d)+", "filter_operation": "exclude"}]
-
usage_fetch_util_quantity_event_type
event type (metric name) to identify data which will be used to calculate
total_quantity
Possible values: metric name
Example:
"usage_fetch_util_quantity_event_type": "cpu.total_logical_cores"
-
usage_fetch_util_idle_perc_event_type
event type (metric name) to identify data which will be used to calculate
total_quantity
Possible values: metric name
Example:
"usage_fetch_util_idle_perc_event_type": "cpu.idle_perc"
calculate_rate
This component finds the rate of change of quantity (in percent) over a time period using following calculation
rate_of_change (in percent) = ((oldest_quantity - latest_quantity)/oldest_quantity) * 100
where,
-
oldest_quantity: oldest (or earliest)
average
quantity if there are multiple quantites in a group for a given time period. -
latest_quantity: latest
average
quantity if there are multiple quantities in a group for a given time period
Other parameters
-
aggregation_group_by_list
List of fields to group by.
Possible values: any set of fields in record store data.
Example:
"aggregation_group_by_list": ["tenant_id"]
-
usage_fetch_operation
Operation to be performed on grouped data set
Possible values: "sum", "max", "min", "avg", "latest", "oldest"
-
aggregation_period
Period to aggregate by.
Possible values: 'daily', 'hourly', 'minutely', 'secondly'.
Example:
"aggregation_period": "hourly"
-
filter_by_list
Filter (include or exclude) record store data as specified
Example:
filter_by_list": "[{"field_to_filter": "hostname", "filter_expression": "comp-(\d)+", "filter_operation": "include"}]
OR
filter_by_list": "[{"field_to_filter": "hostname", "filter_expression": "controller-(\d)+", "filter_operation": "exclude"}]
Setter Components
All usage components implement a method
def setter(transform_context, instance_usage_df):
..
..
return instance_usage_df
set_aggregated_metric_name
This component sets final aggregated metric name by setting aggregated_metric_name
field in
instance_usage
data.
Other parameters
-
aggregated_metric_name
Name of the metric name being generated.
Possible values: any aggregated metric name. Convention is to end the metric name with "_agg".
Example:
"aggregated_metric_name":"cpu.total_logical_cores_agg"
set_aggregated_period
This component sets final aggregated metric name by setting aggregation_period
field in
instance_usage
data.
Other parameters
-
aggregated_period
Name of the metric name being generated.
Possible values: 'daily', 'hourly', 'minutely', 'secondly'.
Example:
"aggregation_period": "hourly"
Note If you are publishing metrics to metrics_pre_hourly kafka topic using
insert_data_pre_hourly
component(See insert_data_pre_hourly component below),
aggregation_period
will have to be set to hourly
since by default all data in
metrics_pre_hourly topic, by default gets aggregated every hour by Pre Hourly Processor
(See
Processors
section below)
rollup_quantity
This component groups instance_usage
records by setter_rollup_group_by_list
, sorts within
group by timestamp field, finds usage based on setter_fetch_operation
.
Other parameters
-
setter_rollup_group_by_list
List of fields to group by.
Possible values: any set of fields in record store data.
Example:
"setter_rollup_group_by_list": ["tenant_id"]
-
setter_fetch_operation
Operation to be performed on grouped data set
Possible values: "sum", "max", "min", "avg"
Example:
"setter_fetch_operation": "avg"
-
aggregation_period
Period to aggregate by.
Possible values: 'daily', 'hourly', 'minutely', 'secondly'.
Example:
"aggregation_period": "hourly"
Insert Components
All usage components implement a method
def insert(transform_context, instance_usage_df):
..
..
return instance_usage_df
insert_data
This component converts instance_usage
data into monasca metric format and writes the metric to
metrics
topic in kafka.
Other parameters
-
dimension_list
List of fields in
instance_usage
data that should be converted to monasca metric dimensions.Possible values: any fields in
instance_usage
dataExample:
"dimension_list":["aggregation_period","host","project_id"]
insert_data_pre_hourly
This component converts instance_usage
data into monasca metric format and writes the metric to
metrics_pre_hourly
topic in kafka.
Other parameters
-
dimension_list
List of fields in
instance_usage
data that should be converted to monasca metric dimensions.Possible values: any fields in
instance_usage
dataExample:
"dimension_list":["aggregation_period","host","project_id"]
Processors
Processors are special components that process data from a kafka topic, at the desired time interval. These are different from generic aggregation components since they process data from specific kafka topic.
All processor components implement following methods
def get_app_name(self):
[...]
return app_name
def is_time_to_run(self, current_time):
if current_time > last_invoked + 1:
return True
else:
return False
def run_processor(self, time):
# do work...
pre_hourly_processor
Pre Hourly Processor, runs every hour and aggregates instance_usage
data published to
metrics_pre_hourly
topic.
Pre Hourly Processor by default is set to run 10 minutes after the top of the hour and processes
data from previous hour. instance_usage
data is grouped by pre_hourly_group_by_list
Other parameters
-
pre_hourly_group_by_list
List of fields to group by.
Possible values: any set of fields in
instance_usage
data or todefault
Note: setting to
default
will groupinstance_usage
data bytenant_id
,user_id
,resource_uuid
,geolocation
,region
,zone
,host
,project_id
,aggregated_metric_name
,aggregation_period
Example:
"pre_hourly_group_by_list": ["tenant_id"]
OR
"pre_hourly_group_by_list": ["default"]
-
pre_hourly_operation
Operation to be performed on grouped data set.
Possible values: "sum", "max", "min", "avg", "rate"
Example:
"pre_hourly_operation": "avg"
Putting it all together
Please refer to Create a new aggregation pipeline document to create a new aggregation pipeline.