535 lines
22 KiB
Python
535 lines
22 KiB
Python
# Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from pyspark import SparkConf
|
|
from pyspark import SparkContext
|
|
|
|
from pyspark.streaming.kafka import KafkaUtils
|
|
from pyspark.streaming.kafka import TopicAndPartition
|
|
from pyspark.streaming import StreamingContext
|
|
|
|
from pyspark.sql.functions import explode
|
|
from pyspark.sql.functions import from_unixtime
|
|
from pyspark.sql.functions import lit
|
|
from pyspark.sql.functions import when
|
|
from pyspark.sql import SQLContext
|
|
|
|
import json
|
|
import logging
|
|
from oslo_config import cfg
|
|
import simport
|
|
import time
|
|
|
|
from monasca_transform.config.config_initializer import ConfigInitializer
|
|
from monasca_transform.transform.builder.generic_transform_builder \
|
|
import GenericTransformBuilder
|
|
|
|
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
|
import DataDrivenSpecsRepo
|
|
|
|
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
|
import DataDrivenSpecsRepoFactory
|
|
|
|
from monasca_transform.transform import RddTransformContext
|
|
from monasca_transform.transform.transform_utils import MonMetricUtils
|
|
from monasca_transform.transform import TransformContextUtils
|
|
|
|
ConfigInitializer.basic_config()
|
|
|
|
# initialize logger
|
|
log = logging.getLogger(__name__)
|
|
_h = logging.FileHandler('%s/monasca_transform.log' %
|
|
cfg.CONF.service.service_log_path)
|
|
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
|
|
"%(lineno)s - %(levelname)s - %(message)s'"))
|
|
log.addHandler(_h)
|
|
log.setLevel(logging.DEBUG)
|
|
|
|
|
|
class MonMetricsKafkaProcessor(object):
|
|
|
|
@staticmethod
|
|
def log_debug(message):
|
|
print(message)
|
|
log.debug(message)
|
|
|
|
@staticmethod
|
|
def store_offset_ranges(rdd):
|
|
if rdd.isEmpty():
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"storeOffsetRanges: nothing to process...")
|
|
return rdd
|
|
else:
|
|
my_offset_ranges = rdd.offsetRanges()
|
|
transform_context = \
|
|
TransformContextUtils.get_context(offset_info=my_offset_ranges)
|
|
rdd_transform_context = \
|
|
rdd.map(lambda x: RddTransformContext(x, transform_context))
|
|
return rdd_transform_context
|
|
|
|
@staticmethod
|
|
def print_offset_ranges(my_offset_ranges):
|
|
for o in my_offset_ranges:
|
|
print("printOffSetRanges: %s %s %s %s" % (
|
|
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
|
|
|
@staticmethod
|
|
def get_kafka_stream(topic, streaming_context):
|
|
offset_specifications = simport.load(cfg.CONF.repositories.offsets)()
|
|
saved_offset_spec = offset_specifications.get_kafka_offsets()
|
|
app_name = streaming_context.sparkContext.appName
|
|
if len(saved_offset_spec) < 1:
|
|
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"No saved offsets available..."
|
|
"connecting to kafka without specifying offsets")
|
|
kvs = KafkaUtils.createDirectStream(
|
|
streaming_context, [topic],
|
|
{"metadata.broker.list": cfg.CONF.messaging.brokers})
|
|
|
|
return kvs
|
|
|
|
else:
|
|
from_offsets = {}
|
|
for key, value in saved_offset_spec.items():
|
|
if key.startswith("%s_%s" % (app_name, topic)):
|
|
# spec_app_name = value.get_app_name()
|
|
spec_topic = value.get_topic()
|
|
spec_partition = int(value.get_partition())
|
|
# spec_from_offset = value.get_from_offset()
|
|
spec_until_offset = value.get_until_offset()
|
|
# composite_key = "%s_%s_%s" % (spec_app_name,
|
|
# spec_topic,
|
|
# spec_partition)
|
|
# partition = saved_offset_spec[composite_key]
|
|
from_offsets[
|
|
TopicAndPartition(spec_topic, spec_partition)
|
|
] = long(spec_until_offset)
|
|
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"get_kafka_stream: calling createDirectStream :"
|
|
" topic:{%s} : start " % topic)
|
|
for key, value in from_offsets.items():
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"get_kafka_stream: calling createDirectStream : "
|
|
"offsets : TopicAndPartition:{%s,%s}, value:{%s}" %
|
|
(str(key._topic), str(key._partition), str(value)))
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"get_kafka_stream: calling createDirectStream : "
|
|
"topic:{%s} : done" % topic)
|
|
|
|
kvs = KafkaUtils.createDirectStream(
|
|
streaming_context, [topic],
|
|
{"metadata.broker.list": cfg.CONF.messaging.brokers},
|
|
from_offsets)
|
|
return kvs
|
|
|
|
@staticmethod
|
|
def save_rdd_contents(rdd):
|
|
file_name = "".join((
|
|
"/vagrant_home/uniq_metrics",
|
|
'-', time.strftime("%Y-%m-%d-%H-%M-%S"),
|
|
'-', str(rdd.id),
|
|
'.log'))
|
|
rdd.saveAsTextFile(file_name)
|
|
|
|
@staticmethod
|
|
def print_unique_metric_count(kvs):
|
|
# print unique metric count
|
|
lines = kvs.map(lambda x: x[1])
|
|
counts = lines.map(
|
|
lambda x: json.loads(x)["metric"]["name"]
|
|
).map(
|
|
lambda name: (name, 1)
|
|
).reduceByKey(
|
|
lambda a, b: a + b)
|
|
counts.pprint(9999)
|
|
|
|
@staticmethod
|
|
def save_kafka_offsets(current_offsets, app_name):
|
|
"""save current offsets to offset specification."""
|
|
# get the offsets from global var
|
|
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
|
|
|
|
for o in current_offsets:
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"adding offset: topic:{%s}, partition:{%s}, fromOffset:{%s}, "
|
|
"untilOffset:{%s}" % (
|
|
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
|
offset_specs.add(
|
|
app_name, o.topic, o.partition, o.fromOffset, o.untilOffset)
|
|
|
|
@staticmethod
|
|
def reset_kafka_offsets():
|
|
"""delete all offsets from the offset specification."""
|
|
# get the offsets from global var
|
|
offset_specs = simport.load(cfg.CONF.repositories.offsets)()
|
|
offset_specs.delete_all_kafka_offsets()
|
|
|
|
@staticmethod
|
|
def _validate_raw_mon_metrics(row):
|
|
|
|
required_fields = row.required_raw_fields_list
|
|
|
|
invalid_list = []
|
|
|
|
for required_field in required_fields:
|
|
required_field_value = None
|
|
|
|
# Look for the field in the first layer of the row
|
|
try:
|
|
required_field_value = eval(".".join(("row", required_field)))
|
|
except Exception:
|
|
pass
|
|
|
|
if (required_field_value is None or required_field_value == "" and
|
|
row.metric is not None and
|
|
row.metric.dimensions is not None):
|
|
# Look for the field in the dimensions layer of the row
|
|
try:
|
|
required_field_value = eval(
|
|
".".join(("row.metric.dimensions", required_field)))
|
|
except Exception:
|
|
pass
|
|
|
|
if (required_field_value is None or required_field_value == "" and
|
|
row.meta is not None):
|
|
# Look for the field in the meta layer of the row
|
|
try:
|
|
required_field_value = eval(
|
|
".".join(("row.meta", required_field)))
|
|
except Exception:
|
|
pass
|
|
|
|
if required_field_value is None \
|
|
or required_field_value == "":
|
|
invalid_list.append("invalid")
|
|
|
|
if len(invalid_list) <= 0:
|
|
return row
|
|
else:
|
|
print("_validate_raw_mon_metrics : found invalid : ** %s: %s" % (
|
|
(".".join(("row", required_field))),
|
|
required_field_value))
|
|
|
|
@staticmethod
|
|
def process_metric(transform_context, record_store_df):
|
|
"""process (aggregate) metric data from record_store data
|
|
All the parameters to drive processing should be available
|
|
in transform_spec_df dataframe.
|
|
"""
|
|
|
|
# call processing chain
|
|
GenericTransformBuilder.do_transform(transform_context,
|
|
record_store_df)
|
|
|
|
@staticmethod
|
|
def process_metrics(transform_context, record_store_df):
|
|
"""start processing (aggregating) metrics
|
|
"""
|
|
#
|
|
# look in record_store_df for list of metrics to be processed
|
|
#
|
|
metric_ids_df = record_store_df.select("metric_id").distinct()
|
|
metric_ids_to_process = [row.metric_id
|
|
for row in metric_ids_df.collect()]
|
|
|
|
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
|
|
get_data_driven_specs_repo()
|
|
sqlc = SQLContext.getOrCreate(record_store_df.rdd.context)
|
|
transform_specs_df = data_driven_specs_repo.get_data_driven_specs(
|
|
sql_context=sqlc,
|
|
data_driven_spec_type=DataDrivenSpecsRepo.transform_specs_type)
|
|
|
|
for metric_id in metric_ids_to_process:
|
|
transform_spec_df = transform_specs_df.select(
|
|
["aggregation_params_map", "metric_id"]
|
|
).where(transform_specs_df.metric_id == metric_id)
|
|
source_record_store_df = record_store_df.select("*").where(
|
|
record_store_df.metric_id == metric_id)
|
|
|
|
# set transform_spec_df in TransformContext
|
|
transform_context = \
|
|
TransformContextUtils.get_context(
|
|
transform_context_info=transform_context,
|
|
transform_spec_df_info=transform_spec_df)
|
|
|
|
MonMetricsKafkaProcessor.process_metric(
|
|
transform_context, source_record_store_df)
|
|
|
|
@staticmethod
|
|
def rdd_to_recordstore(rdd_transform_context_rdd):
|
|
|
|
if rdd_transform_context_rdd.isEmpty():
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"rdd_to_recordstore: nothing to process...")
|
|
else:
|
|
|
|
sql_context = SQLContext(rdd_transform_context_rdd.context)
|
|
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
|
|
get_data_driven_specs_repo()
|
|
pre_transform_specs_df = data_driven_specs_repo.\
|
|
get_data_driven_specs(
|
|
sql_context=sql_context,
|
|
data_driven_spec_type=DataDrivenSpecsRepo.
|
|
pre_transform_specs_type)
|
|
|
|
#
|
|
# extract second column containing raw metric data
|
|
#
|
|
raw_mon_metrics = rdd_transform_context_rdd.map(
|
|
lambda nt: nt.rdd_info[1])
|
|
|
|
#
|
|
# convert raw metric data rdd to dataframe rdd
|
|
#
|
|
raw_mon_metrics_df = \
|
|
MonMetricUtils.create_mon_metrics_df_from_json_rdd(
|
|
sql_context,
|
|
raw_mon_metrics)
|
|
|
|
#
|
|
# filter out unwanted metrics and keep metrics we are interested in
|
|
#
|
|
cond = [
|
|
raw_mon_metrics_df.metric.name ==
|
|
pre_transform_specs_df.event_type]
|
|
filtered_metrics_df = raw_mon_metrics_df.join(
|
|
pre_transform_specs_df, cond)
|
|
|
|
#
|
|
# validate filtered metrics to check if required fields
|
|
# are present and not empty
|
|
# In order to be able to apply filter function had to convert
|
|
# data frame rdd to normal rdd. After validation the rdd is
|
|
# converted back to dataframe rdd
|
|
#
|
|
# FIXME: find a way to apply filter function on dataframe rdd data
|
|
validated_mon_metrics_rdd = filtered_metrics_df.rdd.filter(
|
|
MonMetricsKafkaProcessor._validate_raw_mon_metrics)
|
|
validated_mon_metrics_df = sql_context.createDataFrame(
|
|
validated_mon_metrics_rdd, filtered_metrics_df.schema)
|
|
|
|
#
|
|
# record generator
|
|
# generate a new intermediate metric record if a given metric
|
|
# metric_id_list, in pre_transform_specs table has several
|
|
# intermediate metrics defined.
|
|
# intermediate metrics are used as a convenient way to
|
|
# process (aggregated) metric in mutiple ways by making a copy
|
|
# of the source data for each processing
|
|
#
|
|
gen_mon_metrics_df = validated_mon_metrics_df.select(
|
|
validated_mon_metrics_df.meta,
|
|
validated_mon_metrics_df.metric,
|
|
validated_mon_metrics_df.event_processing_params,
|
|
validated_mon_metrics_df.event_type,
|
|
explode(validated_mon_metrics_df.metric_id_list).alias(
|
|
"this_metric_id"),
|
|
validated_mon_metrics_df.service_id)
|
|
|
|
#
|
|
# transform metrics data to record_store format
|
|
# record store format is the common format which will serve as
|
|
# source to aggregation processing.
|
|
# converting the metric to common standard format helps in writing
|
|
# generic aggregation routines driven by configuration parameters
|
|
# and can be reused
|
|
#
|
|
record_store_df = gen_mon_metrics_df.select(
|
|
(gen_mon_metrics_df.metric.timestamp / 1000).alias(
|
|
"event_timestamp_unix"),
|
|
from_unixtime(
|
|
gen_mon_metrics_df.metric.timestamp / 1000).alias(
|
|
"event_timestamp_string"),
|
|
gen_mon_metrics_df.event_type.alias("event_type"),
|
|
gen_mon_metrics_df.event_type.alias("event_quantity_name"),
|
|
(gen_mon_metrics_df.metric.value / 1.0).alias(
|
|
"event_quantity"),
|
|
when(gen_mon_metrics_df.metric.dimensions.state != '',
|
|
gen_mon_metrics_df.metric.dimensions.state).otherwise(
|
|
'NA').alias("event_status"),
|
|
lit('1.0').alias('event_version'),
|
|
lit('metrics').alias("record_type"),
|
|
|
|
# resource_uuid
|
|
when(gen_mon_metrics_df.metric.dimensions.instanceId != '',
|
|
gen_mon_metrics_df.metric.dimensions.instanceId).when(
|
|
gen_mon_metrics_df.metric.dimensions.resource_id != '',
|
|
gen_mon_metrics_df.metric.dimensions.resource_id).
|
|
otherwise('NA').alias("resource_uuid"),
|
|
|
|
when(gen_mon_metrics_df.metric.dimensions.tenantId != '',
|
|
gen_mon_metrics_df.metric.dimensions.tenantId).when(
|
|
gen_mon_metrics_df.metric.dimensions.tenant_id != '',
|
|
gen_mon_metrics_df.metric.dimensions.tenant_id).when(
|
|
gen_mon_metrics_df.metric.dimensions.project_id != '',
|
|
gen_mon_metrics_df.metric.dimensions.project_id).otherwise(
|
|
'NA').alias("tenant_id"),
|
|
|
|
when(gen_mon_metrics_df.metric.dimensions.mount != '',
|
|
gen_mon_metrics_df.metric.dimensions.mount).otherwise(
|
|
'NA').alias("mount"),
|
|
|
|
when(gen_mon_metrics_df.metric.dimensions.device != '',
|
|
gen_mon_metrics_df.metric.dimensions.device).otherwise(
|
|
'NA').alias("device"),
|
|
|
|
when(gen_mon_metrics_df.meta.userId != '',
|
|
gen_mon_metrics_df.meta.userId).otherwise('NA').alias(
|
|
"user_id"),
|
|
|
|
when(gen_mon_metrics_df.meta.region != '',
|
|
gen_mon_metrics_df.meta.region).when(
|
|
gen_mon_metrics_df.event_processing_params
|
|
.set_default_region_to != '',
|
|
gen_mon_metrics_df.event_processing_params
|
|
.set_default_region_to).otherwise(
|
|
'NA').alias("region"),
|
|
|
|
when(gen_mon_metrics_df.meta.zone != '',
|
|
gen_mon_metrics_df.meta.zone).when(
|
|
gen_mon_metrics_df.event_processing_params
|
|
.set_default_zone_to != '',
|
|
gen_mon_metrics_df.event_processing_params
|
|
.set_default_zone_to).otherwise(
|
|
'NA').alias("zone"),
|
|
|
|
when(gen_mon_metrics_df.metric.dimensions.hostname != '',
|
|
gen_mon_metrics_df.metric.dimensions.hostname).when(
|
|
gen_mon_metrics_df.metric.value_meta.host != '',
|
|
gen_mon_metrics_df.metric.value_meta.host).otherwise(
|
|
'NA').alias("host"),
|
|
|
|
when(gen_mon_metrics_df.service_id != '',
|
|
gen_mon_metrics_df.service_id).otherwise(
|
|
'NA').alias("service_group"),
|
|
|
|
when(gen_mon_metrics_df.service_id != '',
|
|
gen_mon_metrics_df.service_id).otherwise(
|
|
'NA').alias("service_id"),
|
|
|
|
from_unixtime(gen_mon_metrics_df.metric.timestamp / 1000,
|
|
'yyyy-MM-dd').alias("event_date"),
|
|
from_unixtime(gen_mon_metrics_df.metric.timestamp / 1000,
|
|
'HH').alias("event_hour"),
|
|
from_unixtime(gen_mon_metrics_df.metric.timestamp / 1000,
|
|
'mm').alias("event_minute"),
|
|
from_unixtime(gen_mon_metrics_df.metric.timestamp / 1000,
|
|
'ss').alias("event_second"),
|
|
gen_mon_metrics_df.this_metric_id.alias("metric_group"),
|
|
gen_mon_metrics_df.this_metric_id.alias("metric_id"))
|
|
|
|
#
|
|
# get transform context
|
|
#
|
|
rdd_transform_context = rdd_transform_context_rdd.first()
|
|
transform_context = rdd_transform_context.transform_context_info
|
|
|
|
#
|
|
# start processing metrics available in record_store data
|
|
#
|
|
MonMetricsKafkaProcessor.process_metrics(transform_context,
|
|
record_store_df)
|
|
|
|
#
|
|
# extract kafka offsets stored in rdd and save
|
|
#
|
|
|
|
offsets = transform_context.offset_info
|
|
|
|
for o in offsets:
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"going to save: OffSetRanges: %s %s %s %s" % (
|
|
o.topic, o.partition, o.fromOffset, o.untilOffset))
|
|
MonMetricsKafkaProcessor.save_kafka_offsets(
|
|
offsets, rdd_transform_context_rdd.context.appName)
|
|
|
|
@staticmethod
|
|
def transform_to_recordstore(kvs):
|
|
"""Transform metrics data from kafka to record store format.
|
|
extracts, validates, filters, generates data from kakfa to only keep
|
|
data that has to be aggregated. Generate data generates multiple
|
|
records for for the same incoming metric if the metric has multiple
|
|
intermediate metrics defined, so that each of intermediate metrics can
|
|
be potentially processed independently.
|
|
"""
|
|
# save offsets in global var myOffsetRanges
|
|
# http://spark.apache.org/docs/latest/streaming-kafka-integration.html
|
|
# Note that the typecast to HasOffsetRanges will only succeed if it is
|
|
# done in the first method called on the directKafkaStream, not later
|
|
# down a chain of methods. You can use transform() instead of
|
|
# foreachRDD() as your first method call in order to access offsets,
|
|
# then call further Spark methods. However, be aware that the
|
|
# one-to-one mapping between RDD partition and Kafka partition does not
|
|
# remain after any methods that shuffle or repartition,
|
|
# e.g. reduceByKey() or window()
|
|
kvs.transform(
|
|
MonMetricsKafkaProcessor.store_offset_ranges
|
|
).foreachRDD(MonMetricsKafkaProcessor.rdd_to_recordstore)
|
|
|
|
|
|
def invoke():
|
|
# object to keep track of offsets
|
|
ConfigInitializer.basic_config()
|
|
|
|
# app name
|
|
application_name = "mon_metrics_kafka"
|
|
|
|
my_spark_conf = SparkConf().setAppName(application_name)
|
|
|
|
spark_context = SparkContext(conf=my_spark_conf)
|
|
|
|
# read at the configured interval
|
|
spark_streaming_context = \
|
|
StreamingContext(spark_context, cfg.CONF.service.stream_interval)
|
|
|
|
kafka_stream = MonMetricsKafkaProcessor.get_kafka_stream(
|
|
cfg.CONF.messaging.topic,
|
|
spark_streaming_context)
|
|
|
|
# transform to recordstore
|
|
MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream)
|
|
|
|
# print unique metric count
|
|
MonMetricsKafkaProcessor.print_unique_metric_count(kafka_stream)
|
|
|
|
# catch interrupt, stop streaming context gracefully
|
|
# signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
# start processing
|
|
spark_streaming_context.start()
|
|
|
|
# FIXME: stop spark context to relinquish resources
|
|
|
|
# FIXME: specify cores, so as not to use all the resources on the cluster.
|
|
|
|
# FIXME: HA deploy multiple masters, may be one on each control node
|
|
|
|
try:
|
|
# Wait for the Spark driver to "finish"
|
|
spark_streaming_context.awaitTermination()
|
|
except Exception as e:
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"Exception raised during Spark execution : " + str(e))
|
|
# One exception that can occur here is the result of the saved
|
|
# kafka offsets being obsolete/out of range. Delete the saved
|
|
# offsets to improve the chance of success on the next execution.
|
|
MonMetricsKafkaProcessor.log_debug(
|
|
"Deleting saved offsets for chance of success on next execution")
|
|
MonMetricsKafkaProcessor.reset_kafka_offsets()
|
|
|
|
if __name__ == "__main__":
|
|
invoke()
|