monasca-transform/docs/generic-aggregation-compone...

21 KiB

Team and repository tags

Team and repository tags

Monasca Transform Generic Aggregation Components

Introduction

Monasca Transform uses standard ETL (Extract-Transform-Load) design pattern to aggregate monasca metrics and uses innovative data/configuration driven mechanism to drive processing. It accomplishes data aggregation in two distinct steps, each is driven using external configuration specifications, namely pre-transform_spec and transform_spec.

1: Conversion of incoming metrics to record store data format

In the first step, the incoming metrics are converted into a canonical data format called as record store data using pre_transform_spec.

This logical processing data flow is explained in more detail in Monasca/Transform wiki: Logical processing data flow section: Conversion to record store format and includes following operations:

  • identifying metrics that are required (or in other words filtering out of unwanted metrics)

  • validation and extraction of essential data in metric

  • generating multiple records for incoming metrics if they are to be aggregated in multiple ways, and finally

  • conversion of the incoming metrics to canonical record store data format. Please refer to record store section in Data Formats for more information on record store format.

Pre Transform Spec

Example pre_transform_spec for metric

{
  "event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},
  "event_type":"cpu.total_logical_cores",
  "metric_id_list":["cpu_total_all","cpu_total_host","cpu_util_all","cpu_util_host"],
  "required_raw_fields_list":["creation_time"],
  "service_id":"host_metrics"
}

List of fields

field name values description
event_processing_params Set default field values set_default_zone_to, set_default_geolocation_to, set_default_region_to Set default values for certain fields in the record store data
event_type Name of the metric identifies metric that needs to be aggregated
metric_id_list List of metric_id's List of identifiers, should match metric_id in transform specs. This is used by record generation step to generate multiple records if this metric is to be aggregated in multiple ways
required_raw_fields_list List of field List of fields (use JSON dotted notation) that are required in the incoming metric, used for validating incoming metric
service_id service identifier Identifies the service to which this metric belongs to. Note: this field not yet used

2: Data aggregation using generic aggregation components

In the second step, the canonical record store data is aggregated using transform_spec. Each transform_spec defines series of generic aggregation components, which are specified in aggregation_params_map.aggregation_pipeline section. (See transform_spec example below).

Any parameters used by the generic aggregation components are also specified in the aggregation_params_map section (See Other parameters e.g. aggregated_metric_name, aggregation_period, aggregation_group_by_list etc. in transform_spec example below)

Transform Specs

Example transform_spec for metric

{"aggregation_params_map":{
    "aggregation_pipeline":{
        "source":"streaming",
        "usage":"fetch_quantity",
        "setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],
        "insert":["prepare_data","insert_data_pre_hourly"]
    },
    "aggregated_metric_name":"cpu.total_logical_cores_agg",
    "aggregation_period":"hourly",
    "aggregation_group_by_list": ["host", "metric_id", "tenant_id"],
    "usage_fetch_operation": "avg",
    "filter_by_list": [],
    "setter_rollup_group_by_list": [],
    "setter_rollup_operation": "sum",
    "dimension_list":["aggregation_period","host","project_id"],
    "pre_hourly_operation":"avg",
    "pre_hourly_group_by_list":["default"]
 },
 "metric_group":"cpu_total_all",
 "metric_id":"cpu_total_all"
}

aggregation_params_map

This section specifies aggregation_pipeline, Other parameters (used by generic aggregation components in aggregation_pipeline).

aggregation_pipeline

Specifies generic aggregation components that should be used to process incoming metrics.

Note: generic aggregation components are re-usable and can be used to build different aggregation pipelines as required.

List of fields

field name values description
source streaming source is streaming. In the future this can be used to specify a component which can fetch data directly from monasca datastore
usage fetch_quantity, fetch_quantity_util, calculate_rate Usage Components
setters pre_hourly_calculate_rate, rollup_quantity, set_aggregated_metric_name, set_aggregated_period Setter Components
insert insert_data, insert_data_pre_hourly Insert Components
Other parameters

Specifies parameters that generic aggregation components use to process and aggregate data.

List of Other parameters

Parameter Name Values Description Used by
aggregated_metric_name e.g. "cpu.total_logical_cores_agg" Name of the aggregated metric set_aggregated_metric_name
aggregation_period "hourly", "minutely" or "secondly" Period over which to aggregate data. fetch_quantity, fetch_quantity_util, calculate_rate, set_aggregated_period, rollup_quantity
aggregation_group_by_list e.g. "project_id", "hostname" Group record_store data with these columns fetch_quantity, fetch_quantity_util, calculate_rate
usage_fetch_operation e.g "sum" After the data is grouped by aggregation_group_by_list, perform this operation to find the aggregated metric value fetch_quantity, fetch_quantity_util, calculate_rate
filter_by_list Filter regex Filter data using regex on a record_store column value fetch_quantity, fetch_quantity_util, calculate_rate
setter_rollup_group_by_list e.g. "project_id" Group by these set of fields rollup_quantity
setter_rollup_operation e.g. "avg" After data is grouped by setter_rollup_group_by_list, peform this operation to find aggregated metric value rollup_quantity
dimension_list e.g. "aggregation_period", "host", "project_id" List of fields which specify dimensions in aggregated metric insert_data, insert_data_pre_hourly
pre_hourly_group_by_list e.g. "default" List of instance usage data fields to do a group by operation to aggregate data pre_hourly_processor
pre_hourly_operation e.g. "avg" When aggregating data published to metrics_pre_hourly every hour, perform this operation to find hourly aggregated metric value pre_hourly_processor

metric_group and metric_id

Specifies a metric or list of metrics from the record store data, which will be processed by this transform_spec. Note: This can be a single metric or a group of metrics that will be combined to produce the final aggregated metric.

List of fields

field name values description
metric_group unique transform spec group identifier group identifier for this transform spec e.g. "cpu_total_all"
metric_id unique transform spec identifier identifier for this transform spec e.g. "cpu_total_all"

Note: "metric_id" is a misnomer, it is not really a metric group/or metric identifier but rather identifier for transformation spec. This will be changed to "transform_spec_id" in the future.

Generic Aggregation Components

List of Generic Aggregation Components

Usage Components

All usage components implement a method

    def usage(transform_context, record_store_df):
    ..
    ..
    return instance_usage_df

fetch_quantity

This component groups record store records by aggregation_group_by_list, sorts within group by timestamp field, finds usage based on usage_fetch_operation. Optionally this component also takes filter_by_list to include for exclude certain records from usage calculation.

Other parameters

  • aggregation_group_by_list

    List of fields to group by.

    Possible values: any set of fields in record store data.

    Example:

    "aggregation_group_by_list": ["tenant_id"]
    
  • usage_fetch_operation

    Operation to be performed on grouped data set.

    Possible values: "sum", "max", "min", "avg", "latest", "oldest"

  • aggregation_period

    Period to aggregate by.

    Possible values: 'daily', 'hourly', 'minutely', 'secondly'.

    Example:

    "aggregation_period": "hourly"
    
  • filter_by_list

    Filter (include or exclude) record store data as specified.

    Example:

    filter_by_list": "[{"field_to_filter": "hostname",
                        "filter_expression": "comp-(\d)+",
                        "filter_operation": "include"}]
    

    OR

    filter_by_list": "[{"field_to_filter": "hostname",
                        "filter_expression": "controller-(\d)+",
                        "filter_operation": "exclude"}]
    

fetch_quantity_util

This component finds the utilized quantity based on total_quantity and idle_perc using following calculation

utilized_quantity = (100 - idle_perc) * total_quantity / 100

where,

  • total_quantity data, identified by usage_fetch_util_quantity_event_type parameter and

  • idle_perc data, identified by usage_fetch_util_idle_perc_event_type parameter

This component initially groups record store records by aggregation_group_by_list and event_type, sorts within group by timestamp field, calculates total_quantity and idle_perc values based on usage_fetch_operation. utilized_quantity is then calculated using the formula given above.

Other parameters

  • aggregation_group_by_list

    List of fields to group by.

    Possible values: any set of fields in record store data.

    Example:

    "aggregation_group_by_list": ["tenant_id"]
    
  • usage_fetch_operation

    Operation to be performed on grouped data set

    Possible values: "sum", "max", "min", "avg", "latest", "oldest"

  • aggregation_period

    Period to aggregate by.

    Possible values: 'daily', 'hourly', 'minutely', 'secondly'.

    Example:

    "aggregation_period": "hourly"
    
  • filter_by_list

    Filter (include or exclude) record store data as specified

    Example:

    filter_by_list": "[{"field_to_filter": "hostname",
                        "filter_expression": "comp-(\d)+",
                        "filter_operation": "include"}]
    

    OR

    filter_by_list": "[{"field_to_filter": "hostname",
                        "filter_expression": "controller-(\d)+",
                        "filter_operation": "exclude"}]
    
  • usage_fetch_util_quantity_event_type

    event type (metric name) to identify data which will be used to calculate total_quantity

    Possible values: metric name

    Example:

    "usage_fetch_util_quantity_event_type": "cpu.total_logical_cores"
    
  • usage_fetch_util_idle_perc_event_type

    event type (metric name) to identify data which will be used to calculate total_quantity

    Possible values: metric name

    Example:

    "usage_fetch_util_idle_perc_event_type": "cpu.idle_perc"
    

calculate_rate

This component finds the rate of change of quantity (in percent) over a time period using following calculation

rate_of_change (in percent) = ((oldest_quantity - latest_quantity)/oldest_quantity) * 100

where,

  • oldest_quantity: oldest (or earliest) average quantity if there are multiple quantites in a group for a given time period.

  • latest_quantity: latest average quantity if there are multiple quantities in a group for a given time period

Other parameters

  • aggregation_group_by_list

    List of fields to group by.

    Possible values: any set of fields in record store data.

    Example:

    "aggregation_group_by_list": ["tenant_id"]
    
  • usage_fetch_operation

    Operation to be performed on grouped data set

    Possible values: "sum", "max", "min", "avg", "latest", "oldest"

  • aggregation_period

    Period to aggregate by.

    Possible values: 'daily', 'hourly', 'minutely', 'secondly'.

    Example:

    "aggregation_period": "hourly"
    
  • filter_by_list

    Filter (include or exclude) record store data as specified

    Example:

    filter_by_list": "[{"field_to_filter": "hostname",
                        "filter_expression": "comp-(\d)+",
                        "filter_operation": "include"}]
    

    OR

    filter_by_list": "[{"field_to_filter": "hostname",
                        "filter_expression": "controller-(\d)+",
                        "filter_operation": "exclude"}]
    

Setter Components

All usage components implement a method

    def setter(transform_context, instance_usage_df):
    ..
    ..
    return instance_usage_df

set_aggregated_metric_name

This component sets final aggregated metric name by setting aggregated_metric_name field in instance_usage data.

Other parameters

  • aggregated_metric_name

    Name of the metric name being generated.

    Possible values: any aggregated metric name. Convention is to end the metric name with "_agg".

    Example:

    "aggregated_metric_name":"cpu.total_logical_cores_agg"
    

set_aggregated_period

This component sets final aggregated metric name by setting aggregation_period field in instance_usage data.

Other parameters

  • aggregated_period

    Name of the metric name being generated.

    Possible values: 'daily', 'hourly', 'minutely', 'secondly'.

    Example:

    "aggregation_period": "hourly"
    

Note If you are publishing metrics to metrics_pre_hourly kafka topic using insert_data_pre_hourly component(See insert_data_pre_hourly component below), aggregation_period will have to be set to hourlysince by default all data in metrics_pre_hourly topic, by default gets aggregated every hour by Pre Hourly Processor (See Processors section below)

rollup_quantity

This component groups instance_usage records by setter_rollup_group_by_list, sorts within group by timestamp field, finds usage based on setter_fetch_operation.

Other parameters

  • setter_rollup_group_by_list

    List of fields to group by.

    Possible values: any set of fields in record store data.

    Example:

    "setter_rollup_group_by_list": ["tenant_id"]
    
  • setter_fetch_operation

    Operation to be performed on grouped data set

    Possible values: "sum", "max", "min", "avg"

    Example:

    "setter_fetch_operation": "avg"
    
  • aggregation_period

    Period to aggregate by.

    Possible values: 'daily', 'hourly', 'minutely', 'secondly'.

    Example:

    "aggregation_period": "hourly"
    

Insert Components

All usage components implement a method

    def insert(transform_context, instance_usage_df):
    ..
    ..
    return instance_usage_df

insert_data

This component converts instance_usage data into monasca metric format and writes the metric to metrics topic in kafka.

Other parameters

  • dimension_list

    List of fields in instance_usage data that should be converted to monasca metric dimensions.

    Possible values: any fields in instance_usage data

    Example:

    "dimension_list":["aggregation_period","host","project_id"]
    

insert_data_pre_hourly

This component converts instance_usage data into monasca metric format and writes the metric to metrics_pre_hourly topic in kafka.

Other parameters

  • dimension_list

    List of fields in instance_usage data that should be converted to monasca metric dimensions.

    Possible values: any fields in instance_usage data

    Example:

    "dimension_list":["aggregation_period","host","project_id"]
    

Processors

Processors are special components that process data from a kafka topic, at the desired time interval. These are different from generic aggregation components since they process data from specific kafka topic.

All processor components implement following methods

def get_app_name(self):
    [...]
    return app_name

def is_time_to_run(self, current_time):
    if current_time > last_invoked + 1:
        return True
    else:
        return False

def run_processor(self, time):
    # do work...

pre_hourly_processor

Pre Hourly Processor, runs every hour and aggregates instance_usage data published to metrics_pre_hourly topic.

Pre Hourly Processor by default is set to run 10 minutes after the top of the hour and processes data from previous hour. instance_usage data is grouped by pre_hourly_group_by_list

Other parameters

  • pre_hourly_group_by_list

    List of fields to group by.

    Possible values: any set of fields in instance_usage data or to default

    Note: setting to default will group instance_usage data by tenant_id, user_id, resource_uuid, geolocation, region, zone, host, project_id, aggregated_metric_name, aggregation_period

    Example:

    "pre_hourly_group_by_list": ["tenant_id"]
    

    OR

    "pre_hourly_group_by_list": ["default"]
    
  • pre_hourly_operation

    Operation to be performed on grouped data set.

    Possible values: "sum", "max", "min", "avg", "rate"

    Example:

    "pre_hourly_operation": "avg"
    

Putting it all together

Please refer to Create a new aggregation pipeline document to create a new aggregation pipeline.