monasca-transform/monasca_transform/transform/transform_utils.py
Ashwin Agate 022bd11a4d Switch to using Spark version 2.2.0
Following changes were required:

1.)
By default the pre-built distribution
for Spark 2.2.0 is compiled with Scala 2.11.
monasca-transform requires Spark compiled with
Scala 2.10 since we use spark streaming to
pull data from Kafka and the version of Kafka
is compatible with Scala 2.10.
The recommended way is to compile Spark
with Scala 2.10, but for purposes of devstack
plugin made changes to pull the required jars
from mvn directly.
(see SPARK_JARS and SPARK_JAVA_LIB variables in
settings)
All jars get moved to
<SPARK_HOME>/assembly/target/assembly/
target/scala_2.10/jars/
Note: <SPARK_HOME>/jars gets renamed
to <SPARK_HOME>/jars_original.
spark-submit defaults to assembly location
if <SPARK_HOME>/jars directory is missing.

2.) Updated start up scripts for spark
worker and spark master with a new env variable
SPARK_SCALA_VERSIOn=2.10. Also updated
PYTHONPATH variable to add new
py4j-0.10.4-src.zip file

3.) Some changes to adhere to deprecated pyspark
function calls which were removed in Spark 2.0

Change-Id: I8f8393bb91307d55f156b2ebf45225a16ae9d8f4
2017-08-21 11:18:22 -07:00

376 lines
17 KiB
Python

# Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pyspark.sql import SQLContext
from pyspark.sql.types import ArrayType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import MapType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
class TransformUtils(object):
"""utility methods for different kinds of data."""
@staticmethod
def _rdd_to_df(rdd, schema):
"""convert rdd to dataframe using schema."""
spark_context = rdd.context
sql_context = SQLContext.getOrCreate(spark_context)
if schema is None:
df = sql_context.createDataFrame(rdd)
else:
df = sql_context.createDataFrame(rdd, schema)
return df
class InstanceUsageUtils(TransformUtils):
"""utility methods to transform instance usage data."""
@staticmethod
def _get_instance_usage_schema():
"""get instance usage schema."""
# Initialize columns for all string fields
columns = ["tenant_id", "user_id", "resource_uuid",
"geolocation", "region", "zone", "host", "project_id",
"aggregated_metric_name", "firstrecord_timestamp_string",
"lastrecord_timestamp_string",
"service_group", "service_id",
"usage_date", "usage_hour", "usage_minute",
"aggregation_period", "namespace", "pod_name", "app",
"container_name", "interface", "deployment", "daemon_set"]
columns_struct_fields = [StructField(field_name, StringType(), True)
for field_name in columns]
# Add columns for non-string fields
columns_struct_fields.append(StructField("firstrecord_timestamp_unix",
DoubleType(), True))
columns_struct_fields.append(StructField("lastrecord_timestamp_unix",
DoubleType(), True))
columns_struct_fields.append(StructField("quantity",
DoubleType(), True))
columns_struct_fields.append(StructField("record_count",
DoubleType(), True))
columns_struct_fields.append(StructField("processing_meta",
MapType(StringType(),
StringType(),
True),
True))
schema = StructType(columns_struct_fields)
return schema
@staticmethod
def create_df_from_json_rdd(sql_context, jsonrdd):
"""create instance usage df from json rdd."""
schema = InstanceUsageUtils._get_instance_usage_schema()
instance_usage_schema_df = sql_context.read.json(jsonrdd, schema)
return instance_usage_schema_df
class RecordStoreUtils(TransformUtils):
"""utility methods to transform record store data."""
@staticmethod
def _get_record_store_df_schema():
"""get instance usage schema."""
columns = ["event_timestamp_string",
"event_type", "event_quantity_name",
"event_status", "event_version",
"record_type", "resource_uuid", "tenant_id",
"user_id", "region", "zone",
"host", "project_id", "service_group", "service_id",
"event_date", "event_hour", "event_minute",
"event_second", "metric_group", "metric_id",
"namespace", "pod_name", "app", "container_name",
"interface", "deployment", "daemon_set"]
columns_struct_fields = [StructField(field_name, StringType(), True)
for field_name in columns]
# Add a column for a non-string fields
columns_struct_fields.insert(0,
StructField("event_timestamp_unix",
DoubleType(), True))
columns_struct_fields.insert(0,
StructField("event_quantity",
DoubleType(), True))
schema = StructType(columns_struct_fields)
return schema
@staticmethod
def recordstore_rdd_to_df(record_store_rdd):
"""convert record store rdd to a dataframe."""
schema = RecordStoreUtils._get_record_store_df_schema()
return TransformUtils._rdd_to_df(record_store_rdd, schema)
@staticmethod
def create_df_from_json(sql_context, jsonpath):
"""create a record store df from json file."""
schema = RecordStoreUtils._get_record_store_df_schema()
record_store_df = sql_context.read.json(jsonpath, schema)
return record_store_df
class TransformSpecsUtils(TransformUtils):
"""utility methods to transform_specs."""
@staticmethod
def _get_transform_specs_df_schema():
"""get transform_specs df schema."""
# FIXME: change when transform_specs df is finalized
source = StructField("source", StringType(), True)
usage = StructField("usage", StringType(), True)
setters = StructField("setters", ArrayType(StringType(),
containsNull=False), True)
insert = StructField("insert", ArrayType(StringType(),
containsNull=False), True)
aggregation_params_map = \
StructField("aggregation_params_map",
StructType([StructField("aggregation_period",
StringType(), True),
StructField("dimension_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("aggregation_group_by_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("usage_fetch_operation",
StringType(),
True),
StructField("filter_by_list",
ArrayType(MapType(StringType(),
StringType(),
True)
)
),
StructField(
"usage_fetch_util_quantity_event_type",
StringType(),
True),
StructField(
"usage_fetch_util_idle_perc_event_type",
StringType(),
True),
StructField("setter_rollup_group_by_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("setter_rollup_operation",
StringType(), True),
StructField("aggregated_metric_name",
StringType(), True),
StructField("pre_hourly_group_by_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("pre_hourly_operation",
StringType(), True),
StructField("aggregation_pipeline",
StructType([source, usage,
setters, insert]),
True)
]), True)
metric_id = StructField("metric_id", StringType(), True)
schema = StructType([aggregation_params_map, metric_id])
return schema
@staticmethod
def transform_specs_rdd_to_df(transform_specs_rdd):
"""convert transform_specs rdd to a dataframe."""
schema = TransformSpecsUtils._get_transform_specs_df_schema()
return TransformUtils._rdd_to_df(transform_specs_rdd, schema)
@staticmethod
def create_df_from_json(sql_context, jsonpath):
"""create a metric processing df from json file."""
schema = TransformSpecsUtils._get_transform_specs_df_schema()
transform_specs_df = sql_context.read.json(jsonpath, schema)
return transform_specs_df
class MonMetricUtils(TransformUtils):
"""utility methods to transform raw metric."""
@staticmethod
def _get_mon_metric_json_schema():
"""get the schema of the incoming monasca metric."""
dimensions = ["apache_host", "apache_port", "component",
"consumer_group", "device", "hostname",
"mode", "mount", "mount_point", "observer_host",
"process_name", "project_id", "resource_id", "service",
"test_type", "tenantId", "tenant_id", "topic", "url",
"state", "state_description", "instanceId",
"namespace", "pod_name", "app", "container_name",
"interface", "deployment", "daemon_set"]
dimensions_struct_fields = [
StructField(field_name, StringType(), True)
for field_name in dimensions]
value_meta = ["host"]
value_meta_struct_fields = [
StructField(field_name, StringType(), True)
for field_name in value_meta]
metric_struct_field = StructField(
"metric",
StructType([StructField("dimensions",
StructType(dimensions_struct_fields)),
StructField("value_meta",
StructType(value_meta_struct_fields)),
StructField("name", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("value", StringType(), True)]), True)
meta_struct_field = StructField(
"meta",
StructType([StructField("timestamp", StringType(), True),
StructField("region", StringType(), True),
StructField("tenantId", StringType(), True),
StructField("userId", StringType(), True),
StructField("zone", StringType(), True),
StructField("geolocation", StringType(), True)]))
creation_time_struct_field = StructField("creation_time",
StringType(), True)
schema = StructType([creation_time_struct_field,
meta_struct_field, metric_struct_field])
return schema
@staticmethod
def create_mon_metrics_df_from_json_rdd(sql_context, jsonrdd):
"""create mon metrics df from json rdd."""
schema = MonMetricUtils._get_mon_metric_json_schema()
mon_metrics_df = sql_context.read.json(jsonrdd, schema)
return mon_metrics_df
class PreTransformSpecsUtils(TransformUtils):
"""utility methods to transform pre_transform_specs"""
@staticmethod
def _get_pre_transform_specs_df_schema():
"""get pre_transform_specs df schema."""
# FIXME: change when pre_transform_specs df is finalized
event_type = StructField("event_type", StringType(), True)
metric_id_list = StructField("metric_id_list",
ArrayType(StringType(),
containsNull=False),
True)
required_raw_fields_list = StructField("required_raw_fields_list",
ArrayType(StringType(),
containsNull=False),
True)
service_id = StructField("service_id", StringType(), True)
event_processing_params = \
StructField("event_processing_params",
StructType([StructField("set_default_zone_to",
StringType(), True),
StructField("set_default_geolocation_to",
StringType(), True),
StructField("set_default_region_to",
StringType(), True),
]), True)
schema = StructType([event_processing_params, event_type,
metric_id_list, required_raw_fields_list,
service_id])
return schema
@staticmethod
def pre_transform_specs_rdd_to_df(pre_transform_specs_rdd):
"""convert pre_transform_specs processing rdd to a dataframe."""
schema = PreTransformSpecsUtils._get_pre_transform_specs_df_schema()
return TransformUtils._rdd_to_df(pre_transform_specs_rdd, schema)
@staticmethod
def create_df_from_json(sql_context, jsonpath):
"""create a pre_transform_specs df from json file."""
schema = PreTransformSpecsUtils._get_pre_transform_specs_df_schema()
pre_transform_specs_df = sql_context.read.json(jsonpath, schema)
return pre_transform_specs_df
class GroupingResultsUtils(TransformUtils):
"""utility methods to transform record store data."""
@staticmethod
def _get_grouping_results_df_schema(group_by_column_list):
"""get grouping results schema."""
group_by_field_list = [StructField(field_name, StringType(), True)
for field_name in group_by_column_list]
# Initialize columns for string fields
columns = ["firstrecord_timestamp_string",
"lastrecord_timestamp_string"]
columns_struct_fields = [StructField(field_name, StringType(), True)
for field_name in columns]
# Add columns for non-string fields
columns_struct_fields.append(StructField("firstrecord_timestamp_unix",
DoubleType(), True))
columns_struct_fields.append(StructField("lastrecord_timestamp_unix",
DoubleType(), True))
columns_struct_fields.append(StructField("firstrecord_quantity",
DoubleType(), True))
columns_struct_fields.append(StructField("lastrecord_quantity",
DoubleType(), True))
columns_struct_fields.append(StructField("record_count",
DoubleType(), True))
instance_usage_schema_part = StructType(columns_struct_fields)
grouping_results = \
StructType([StructField("grouping_key",
StringType(), True),
StructField("results",
instance_usage_schema_part,
True),
StructField("grouping_key_dict",
StructType(group_by_field_list))])
# schema = \
# StructType([StructField("GroupingResults", grouping_results)])
return grouping_results
@staticmethod
def grouping_results_rdd_to_df(grouping_results_rdd, group_by_list):
"""convert record store rdd to a dataframe."""
schema = GroupingResultsUtils._get_grouping_results_df_schema(
group_by_list)
return TransformUtils._rdd_to_df(grouping_results_rdd, schema)