Add basic analytics for Monasca metrics

This commit introduces 3 examples of LDPs that process
monasca metrics:

 * `Aggregate` - compute simple stats per metric name.
 * `Combine` - allow mixing of different metrics by providing
   a simple expression parser that gets transformed into
   CPython bytecode.
 * `Derivative` - compute a metric derivative.

While quite simple, those examples, will help us improve the
framework by emphazing some of the problems we will face when
working on more complex solution such as:

 * Spark's inability to share data between sliding windows.
 * Ordering of data.
 * Sampling of metrics / events.

Change-Id: I259022f20e9b288aa2a08c24ad4a5f41a20e6095
This commit is contained in:
Joan Varvenne 2016-06-24 15:06:52 +01:00 committed by Roland Hochmuth
parent e8c50b3993
commit 3f5fc11f31
50 changed files with 1730 additions and 140 deletions

View File

@ -5,7 +5,7 @@ ifeq (testspec,$(firstword $(MAKECMDGOALS)))
$(eval $(TEST_ARGS):;@:)
endif
MONANAS_SRC=monasca_analytics
PYTHON=python
all: test style
@ -17,14 +17,14 @@ testspec:
$(PYTHON) -m unittest -v $(TEST_ARGS)
clean:
find . -type f -name '*.pyc' -exec rm {} +
find $(MONANAS_SRC) -type f -name '*.pyc' -exec rm {} +
style:
find . -type f -name '*.py' -exec pep8 --max-line-length 79 {} +
find $(MONANAS_SRC) -type f -name '*.py' -exec pep8 --max-line-length 79 {} +
start:
bash -c "sleep 5; curl -H \"Content-Type: application/json\" -d '{\"action\": \"start_streaming\"}' http://localhost:3000/" &
$(PYTHON) run.py -p ~/spark -c ./config/markov_source_config.json -l ./config/logging.json
bash -c "sleep 7; curl -H \"Content-Type: application/json\" -d '{\"action\": \"start_streaming\"}' http://localhost:3000/" &
$(PYTHON) run.py -p ~/spark -c ./config/metric_experiments.json -l ./config/logging.json
.PHONY: all test clean style testspec

View File

@ -0,0 +1,70 @@
{
"spark_config": {
"appName": "testApp",
"streaming": {
"batch_interval": 1
}
},
"server": {
"port": 3000,
"debug": false
},
"sources": {
"src1": {
"module": "MonascaMarkovChainSource",
"params": {
"server_sleep_in_seconds": 0.01
}
}
},
"ingestors": {},
"smls": {},
"voters": {},
"sinks": {
"snk2": {
"module": "StdoutSink"
},
"snk3": {
"module": "FileSink",
"params": {
"path": "~/monasca-aggregate.log"
}
}
},
"ldps": {
"ldp3": {
"module": "MonascaAggregateLDP",
"params": {
"aggregation_period": 2,
"aggregation_function": "cnt"
}
},
"ldp4": {
"module": "MonascaCombineLDP",
"params": {
"metric_name": "cpu.logical_cores_actives",
"combine_period": 1,
"lambda": "a * b",
"metric_names_binding": {
"a": "cpu.idle_perc",
"b": "cpu.total_logical_cores"
}
}
},
"ldp5": {
"module": "MonascaDerivativeLDP",
"params": {
"derivative_period": 1
}
}
},
"connections": {
"src1": ["ldp5"],
"ldp3": [],
"ldp4": [],
"ldp5": ["snk2"],
"snk2": [],
"snk3": []
},
"feedback": {}
}

View File

@ -0,0 +1,76 @@
{
"spark_config": {
"appName": "testApp",
"streaming": {
"batch_interval": 1
}
},
"server": {
"port": 3000,
"debug": false
},
"sources": {
"src1": {
"module": "MonascaMarkovChainSource",
"params": {
"server_sleep_in_seconds": 0.01
}
}
},
"ingestors": {},
"smls": {},
"voters": {},
"sinks": {
"snk1": {
"module": "KafkaSink",
"params": {
"host": "localhost",
"port": 9092,
"topic": "monasca_experiments"
}
},
"snk2": {
"module": "StdoutSink"
}
},
"ldps": {
"ldp1": {
"module": "MonascaAggregateLDP",
"params": {
"aggregation_period": 2,
"aggregation_function": "max"
}
},
"ldp2": {
"module": "MonascaAggregateLDP",
"params": {
"aggregation_period": 2,
"aggregation_function": "min"
}
},
"ldp3": {
"module": "MonascaAggregateLDP",
"params": {
"aggregation_period": 2,
"aggregation_function": "avg"
}
},
"ldp4": {
"module": "MonascaAggregateLDP",
"params": {
"aggregation_period": 2,
"aggregation_function": "sum"
}
}
},
"connections": {
"src1": ["ldp1", "ldp2", "ldp3", "ldp4"],
"ldp1": ["snk2"],
"ldp2": ["snk2"],
"ldp3": ["snk2"],
"ldp4": ["snk2"],
"snk1": [],
"snk2": []
},
"feedback": {}
}

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from expression import create_fn_with_config
from expression import validate_environment
from expression import validate_expression
from expression import validate_name_binding
create_fn_with_config = create_fn_with_config
validate_expression = validate_expression
validate_environment = validate_environment
validate_name_binding = validate_name_binding

View File

@ -0,0 +1,221 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import types
import monasca_analytics.exception.banana as exception
import peak.util.assembler as ass
import private as priv
import pyparsing as p
logger = logging.getLogger(__name__)
class ExpressionParser(object):
def __init__(self):
"""
Create a parser that parse arithmetic expressions. They can
contains variable identifiers or raw numbers. The meaning
for the identifiers is left to the
"""
number = p.Regex(r'\d+(\.\d*)?([eE]\d+)?')
identifier = p.Word(p.alphas)
terminal = identifier | number
self._expr = p.infixNotation(terminal, [
(p.oneOf('* /'), 2, p.opAssoc.LEFT),
(p.oneOf('+ -'), 2, p.opAssoc.LEFT)
])
def parse(self, string, code=ass.Code()):
"""
Parse a given string and construct an Evaluator
:type string: basestring
:param string: String to parse.
:type code: ass.Code
:param code: Generated code will be written here.
:return: Returns an evaluator that will returns a value
given the appropriate environment to resolve
variables.
"""
tree = self._expr.parseString(string)[0]
self._build_tree(tree, code)
def parse_tree(self, expr):
"""
Parse the given string and return the generated tree
by pyparsing.
:type expr: str
:param expr: Expression to parse.
:return: Returns the generated tree.
"""
return self._expr.parseString(expr)
@staticmethod
def _build_tree(subtree, code):
"""
:type subtree: list
:param subtree: Sub tree to parse
:type code: ass.Code
:param code: Generated code is written here.
"""
operator = filter(priv.is_op, subtree)[0]
pushed_one_stack_value = False
for child in filter(priv.is_not_op, subtree):
if isinstance(child, basestring):
code(ass.Local(child))
if not pushed_one_stack_value:
pushed_one_stack_value = True
else:
ExpressionParser._push_op(operator, code)
else:
ExpressionParser._build_tree(child, code)
if not pushed_one_stack_value:
pushed_one_stack_value = True
else:
ExpressionParser._push_op(operator, code)
@staticmethod
def _push_op(operator, code):
if operator == '+':
code.BINARY_ADD()
elif operator == '-':
code.BINARY_SUBTRACT()
elif operator == '/':
code.BINARY_DIVIDE()
elif operator == '*':
code.BINARY_MULTIPLY()
def create_fn_with_config(env, expr_string):
"""
Create an evaluator given the expected
environment renaming and expression.
:type env: dict[str, str]
:param env: Environment to use.
:type expr_string: str
:param expr_string: String containing the expression
to be evaluated
:returns: Returns a function that accept one argument
expected to be the environment.
"""
code = ass.Code()
# Argument
code(ass.Local('__monanas__env'))
code.co_argcount = 1
# Create local variables
for key, value in env.iteritems():
code(ass.Call(
ass.Getattr(
ass.Local('__monanas__env'), 'get'),
[ass.Const(value)]),
ass.LocalAssign(str(key)))
parser = ExpressionParser()
try:
parser.parse(expr_string, code)
except p.ParseException as e:
raise exception.BananaInvalidExpression(e.message)
code.RETURN_VALUE()
final_fn = types.FunctionType(code.code(), globals())
return final_fn
def validate_environment(env):
"""
Validate the given arguments that create_fn_with_config
is expecting.
:param env: Environment spec
"""
for key, val in env.iteritems():
if not isinstance(key, basestring):
raise exception.BananaEnvironmentError(
"{} is not a valid key (only string are)".format(key)
)
if not isinstance(val, basestring):
raise exception.BananaEnvironmentError(
"{} is not a valid value (only string are)".format(val)
)
def validate_expression(expr_string):
"""
Validate the provided expression string.
:type expr_string: str
:param expr_string: Expression string to validate.
:returns: Returns a handle that can be use to validate
name usage against an environment.
:raises: exception.BananaInvalidExpression
"""
if not isinstance(expr_string, basestring):
raise exception.BananaArgumentTypeError(
expected_type=basestring,
received_type=type(expr_string)
)
parser = ExpressionParser()
try:
res = parser.parse_tree(expr_string)
return ExpressionHandle(res, expr_string)
except p.ParseException as e:
raise exception.BananaInvalidExpression(e.message)
def validate_name_binding(expr_handle, environment):
"""
Validate the name binding in the expr_handle for
the provided environment.
:type expr_handle: ExpressionHandle
:param expr_handle: The expression handle
:type environment: dict
:param environment: The environment
"""
if not isinstance(expr_handle, ExpressionHandle):
raise exception.BananaArgumentTypeError(
expected_type=ExpressionHandle,
received_type=type(expr_handle)
)
def collect_names(subtree):
"""
Collect names used in this subtree
:type subtree: list
:param subtree: subtree
"""
for child in subtree:
if isinstance(child, basestring):
if priv.is_not_op(child):
names.add(child)
else:
collect_names(child)
names = set()
collect_names(expr_handle.tree)
for name_binding in environment.keys():
if name_binding not in names:
raise exception.BananaInvalidExpression(
"This expression ('{}') can't be used with the provided "
"environment: '{}'.\nReason: '{}' is not defined.".format(
expr_handle.original_str,
environment,
name_binding
)
)
class ExpressionHandle(object):
def __init__(self, tree, original_string):
self.tree = tree
self.original_str = original_string

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
BANANA_OPERATOR_LIST = ['+', '-', '*', '/']
def is_op(x):
return x in BANANA_OPERATOR_LIST
def is_not_op(x):
return x not in BANANA_OPERATOR_LIST

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Banana Error classes."""
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class BananaException(Exception):
@abc.abstractmethod
def __str__(self):
pass
class BananaInvalidExpression(BananaException):
def __init__(self, value):
self._value = value
def __str__(self):
return repr(self._value)
class BananaEnvironmentError(BananaException):
def __init__(self, value):
self._value = value
def __str__(self):
return repr(self._value)
class BananaArgumentTypeError(BananaException):
def __init__(self, expected_type, received_type):
self._value = "Wrong type of argument: expected '{}' got '{}'".\
format(expected_type.__name__, received_type.__name__)
def __str__(self):
return repr(self._value)

View File

@ -21,6 +21,7 @@ import numpy as np
import schema
from monasca_analytics.ingestor import base
import monasca_analytics.util.spark_func as fn
logger = logging.getLogger(__name__)
@ -39,10 +40,11 @@ class CloudIngestor(base.BaseIngestor):
}).validate(_config)
def map_dstream(self, dstream):
feature_list = list(self._features)
return dstream.map(
lambda rdd_entry: CloudIngestor._process_data(rdd_entry,
feature_list))
features_list = list(self._features)
return dstream.map(fn.from_json)\
.map(lambda rdd_entry: CloudIngestor._process_data(
rdd_entry,
features_list))
@staticmethod
def get_default_config():

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import numpy as np
@ -22,10 +21,11 @@ import schema
from monasca_analytics.ingestor import base
from monasca_analytics.source import iptables_markov_chain as src
import monasca_analytics.util.spark_func as fn
logger = logging.getLogger(__name__)
RDD_EVENTS = "events"
RDD_EVENT = "event"
RDD_CTIME = "ctime"
EVENT_MSG = "msg"
@ -53,10 +53,11 @@ class IptablesIngestor(base.BaseIngestor):
return {"module": IptablesIngestor.__name__}
def map_dstream(self, dstream):
new_dstream = dstream.map(
lambda rdd_entry: IptablesIngestor._process_data(rdd_entry,
self._features))
return new_dstream
features_list = list(self._features)
return dstream.map(fn.from_json)\
.map(lambda rdd_entry: IptablesIngestor._process_data(
rdd_entry,
features_list))
@staticmethod
def _process_data(rdd_entry, feature_list):
@ -65,14 +66,14 @@ class IptablesIngestor(base.BaseIngestor):
Assuming the rdd_entry is encoded in JSON format, this method
gets the events and vectorizes them according to the features.
:type rdd_entry: str
:param rdd_entry: json encoded in a string, containing
the data of an RDD
:type rdd_entry: list[dict]
:param rdd_entry: event
:type feature_list: list[str]
:param feature_list: features to extract, in order
"""
rdd_json = json.loads(rdd_entry)
events = rdd_json[RDD_EVENTS]
events = []
for event in rdd_entry:
events.append(event[RDD_EVENT])
return IptablesIngestor._vectorize_events(events, feature_list)
@staticmethod

View File

@ -14,14 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import schema
from monasca_analytics.ingestor import iptables as ip_ing
import monasca_analytics.ingestor.iptables as ip_ing
import monasca_analytics.ldp.base as bt
from monasca_analytics.sml import svm_one_class
import monasca_analytics.util.spark_func as fn
logger = logging.getLogger(__name__)
@ -46,23 +46,28 @@ class IptablesLDP(bt.BaseLDP):
def map_dstream(self, dstream):
"""Detect anomalies in a dstream using the learned classifier
:param dstream: pyspark.streaming.DStream
:type dstream: pyspark.streaming.DStream
:param dstream: Spark's DStream
"""
data = self._data
return dstream.flatMap(lambda r:
self._detect_anomalies(r, data))
return dstream.map(fn.from_json)\
.flatMap(lambda r:
self._detect_anomalies(r, data))
def _detect_anomalies(self, rdd_entry, data):
"""Classifies and marks the RDD entry as anomalous or non-anomalous
:type rdd_entry: pyspark.RDD
:type rdd_entry: list[dict]
:param rdd_entry: entry to be classified
:type data: dict
:param data: contains the features and the classifier
"""
rdd_entry = json.loads(rdd_entry)
new_entries = []
events = rdd_entry[ip_ing.RDD_EVENTS]
events = []
ctimes = []
for event in rdd_entry:
events.append(event[ip_ing.RDD_EVENT])
ctimes.append(event["ctime"])
features = data[FEATURES]
classifier = data[MATRIX]
@ -73,7 +78,7 @@ class IptablesLDP(bt.BaseLDP):
Y = classifier.predict(X)
for i in range(len(events)):
event = events[i]
event["ctime"] = rdd_entry["ctime"]
event["ctime"] = ctimes[i]
if Y[0] == svm_one_class.ANOMALY:
event["anomalous"] = True
else:

View File

@ -0,0 +1,61 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import bisect
import time
def interpolate(timestamp, metric_values, metric_timestamps):
"""
:type timestamp: int
:param timestamp: Timestamp at witch we want to interpolate or extrapolate
the metric value
:type metric_values: list[dict]
:param metric_values: List of metrics
:type metric_timestamps: list[int]
:param metric_timestamps: List of timestamp for the given metric.
:rtype: float
:return: Returns the interpolated value
"""
insertion_pos = bisect.bisect_left(metric_timestamps, timestamp)
# Edge cases:
if insertion_pos == 0:
return metric_values[0]["metric"]["value"]
if insertion_pos == len(metric_timestamps) - 1:
return metric_values[len(metric_timestamps) - 1]["metric"]["value"]
if metric_timestamps[insertion_pos] == timestamp:
return metric_values[insertion_pos]["metric"]["value"]
# General case:
lo = metric_timestamps[insertion_pos - 1]
hi = metric_timestamps[insertion_pos]
dt = hi - lo
return metric_values[insertion_pos - 1]["metric"]["value"] *\
(timestamp - lo) / dt + \
metric_values[insertion_pos]["metric"]["value"] *\
(hi - timestamp) / dt
def create_agg_metric(metric_name, meta, dimensions, timestamp, value):
return {
"metric": {
"name": metric_name,
"dimensions": dimensions,
"timestamp": timestamp,
"value": value
},
"meta": meta,
"creation_time": int(time.time())
}

View File

@ -0,0 +1,169 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import schema
import monasca_analytics.ldp.base as bt
import monasca_analytics.ldp.monasca.helpers as helpers
import monasca_analytics.util.spark_func as fn
logger = logging.getLogger(__name__)
class MonascaAggregateLDP(bt.BaseLDP):
"""Monasca aggregator live data processor"""
def __init__(self, _id, _config):
super(MonascaAggregateLDP, self).__init__(_id, _config)
self._aggregation_period = _config["params"]["aggregation_period"]
self._reducer_func = MonascaAggregateLDP.select_reducer(_config)
self._suffix = "_" + _config["params"]["aggregation_function"]
@staticmethod
def validate_config(_config):
return schema.Schema({
"module": schema.And(basestring,
lambda i: not any(c.isspace() for c in i)),
"params": {
"aggregation_period": int,
"aggregation_function": schema.Or(
"avg",
"max",
"sum",
"min",
"cnt"
)
}
})
@staticmethod
def get_default_config():
return {
"module": MonascaAggregateLDP.__name__,
"params": {
# One hour
"aggregation_period": 60 * 60,
"aggregation_function": "avg"
}
}
def map_dstream(self, dstream):
"""
Map the given DStream into a new DStream where metrics
have been aggregated by name.
:type dstream: pyspark.streaming.DStream
:param dstream: DStream
:return: Returns the stream of aggregated metrics
"""
red = self._reducer_func
suf = self._suffix
agg_period = self._aggregation_period
# TODO(Joan): Add a filter to only aggregate some metrics
# TODO(Joan): or particular dimensions
return dstream.map(fn.from_json) \
.window(agg_period, agg_period) \
.map(lambda metric: (metric["metric"]["name"], metric)) \
.groupByKey() \
.flatMapValues(lambda metrics: MonascaAggregateLDP.aggregate(
metrics,
red,
suf
))\
.map(lambda metric_and_name: metric_and_name[1])
@staticmethod
def aggregate(all_metrics, reducer, suffix):
"""
Aggregate values produced by different providers together.
The metric name is assumed to be the same for all providers.
:type all_metrics: list[dict]
:param all_metrics: Values to aggregate mapping to a specific
metric name.
:type reducer: (float, float, float) -> float
:param reducer: Combine the metrics values together
:type suffix: str
:param suffix: Suffix to append to the metric name in its combined form
"""
# Collect metric separately
separated_metrics = {} # type: dict[frozenset, list[dict]]
for el in all_metrics:
key = frozenset(el["metric"]["dimensions"].items())
if key not in separated_metrics:
separated_metrics[key] = [el]
else:
separated_metrics[key].append(el)
# Collect all dimensions
dims = {}
for metric_dims in separated_metrics.keys():
for prop, val in dict(metric_dims).iteritems():
if prop in dims:
dims[prop].add(val)
else:
dims[prop] = set(val)
# Sort each metric
for _, metric in separated_metrics.iteritems():
metric.sort(key=lambda v: v["metric"]["timestamp"])
separated_metrics = sorted(separated_metrics.values(), key=len)
separated_metrics.reverse()
# Compute the new values
new_values = []
all_timestamps = map(
lambda l: map(
lambda x: x["metric"]["timestamp"], l),
separated_metrics)
metric_count = len(separated_metrics)
for index in xrange(0, len(separated_metrics[0])):
new_value = separated_metrics[0][index]["metric"]["value"]
new_timestamp = separated_metrics[0][index]["metric"]["timestamp"]
for metric_index in xrange(1, metric_count):
new_value = reducer(new_value, helpers.interpolate(
new_timestamp,
separated_metrics[metric_index],
all_timestamps[metric_index]
), metric_count)
new_values.append((new_timestamp, new_value))
# Aggregate the other details:
metric_name = separated_metrics[0][0]["metric"]["name"] + suffix
meta = separated_metrics[0][0]["meta"]
new_metrics = [
helpers.create_agg_metric(
metric_name,
meta,
dims,
val[0],
val[1]
) for val in new_values
]
return new_metrics
@staticmethod
def select_reducer(_config):
return {
"avg": lambda acc, m, cnt: m["metric"]["value"] / cnt + acc,
"max": lambda acc, m, cnt: max(m["metric"]["value"], acc),
"sum": lambda acc, m, cnt: m["metric"]["value"] + acc,
"min": lambda acc, m, cnt: min(m["metric"]["value"], acc),
"cnt": lambda acc, m, cnt: cnt
}[_config["params"]["aggregation_function"]]

View File

@ -0,0 +1,240 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import schema
import monasca_analytics.banana as banana
import monasca_analytics.ldp.base as bt
import monasca_analytics.ldp.monasca.helpers as helpers
import monasca_analytics.util.spark_func as fn
logger = logging.getLogger(__name__)
class MonascaCombineLDP(bt.BaseLDP):
"""Monasca combiner live data processor"""
def __init__(self, _id, _config):
super(MonascaCombineLDP, self).__init__(_id, _config)
logger.debug(_config["params"]["metric_names_binding"])
logger.debug(_config["params"]["lambda"])
self._combine_function = banana.create_fn_with_config(
env=_config["params"]["metric_names_binding"],
expr_string=_config["params"]["lambda"]
)
self._combine_period = _config["params"]["combine_period"]
self._combine_metric_name = _config["params"]["metric_name"]
self._metrics_of_interest = _config["params"][
"metric_names_binding"].values()
def map_dstream(self, dstream):
"""
Map the given DStream into a new DStream where the specified metrics
have been combined together.
:type dstream: pyspark.streaming.DStream
:param dstream: DStream
:return: Returns the stream of combined metrics
"""
combine_fn = self._combine_function
metric_names = self._metrics_of_interest
nb_metrics = len(metric_names)
metric_name = self._combine_metric_name
return dstream.map(fn.from_json)\
.window(self._combine_period, self._combine_period)\
.filter(lambda x: x["metric"]["name"] in metric_names and
x["metric"]["value"] is not None) \
.map(lambda x: (frozenset(x["metric"]["dimensions"]), x))\
.groupByKey()\
.flatMapValues(lambda metrics: MonascaCombineLDP.combine(
metrics,
combine_fn,
metric_name,
nb_metrics
))\
.map(lambda x: x[1])
@staticmethod
def combine(all_metrics, combine_fn, combine_metric_name, nb_of_metrics):
"""
Combine the given metrics of this RDD into one.
:type all_metrics: pyspark.resultiterable.ResultIterable
:param all_metrics: List containing the metrics.
:param combine_fn: Combiner.
:type combine_metric_name: str
:param combine_metric_name: Name of the new metric
:type nb_of_metrics: int
:param nb_of_metrics: The number of metrics expected
"""
# Separate metrics based on name
separated_metrics = {} # type: dict[str, list[dict]]
dims = None
for el in all_metrics:
key = el["metric"]["name"]
if dims is None:
dims = el["metric"]["dimensions"]
if key not in separated_metrics:
separated_metrics[key] = [el]
else:
separated_metrics[key].append(el)
if len(separated_metrics.keys()) != nb_of_metrics:
return []
separated_metrics = sorted(list(separated_metrics.iteritems()),
key=lambda x: len(x[1]))
separated_metrics = separated_metrics # type: list[(str, list[dict])]
# Sort each metric
for metric in separated_metrics:
metric[1].sort(key=lambda v: v["metric"]["timestamp"])
temp_values = []
all_timestamp = map(
lambda l: map(
lambda x: x["metric"]["timestamp"], l[1]),
separated_metrics)
for index in xrange(0, len(separated_metrics[0][1])):
current_env = {
separated_metrics[0][0]:
separated_metrics[0][1][index]["metric"]["value"]
}
timestamp = all_timestamp[0][index]
for metric_index in xrange(1, len(separated_metrics)):
metric_prop = separated_metrics[metric_index]
metric_name = metric_prop[0]
current_env[metric_name] = helpers.interpolate(
timestamp,
metric_prop[1],
all_timestamp[metric_index]
)
temp_values.append(current_env)
new_values = map(combine_fn, temp_values)
new_metrics = [
helpers.create_agg_metric(
combine_metric_name,
{},
dims,
tsmp,
val
) for val, tsmp in zip(new_values, all_timestamp[0])
]
return new_metrics
@staticmethod
def validate_config(_config):
schema.Schema({
"module": schema.And(basestring,
lambda i: not any(c.isspace() for c in i)),
"params": {
"metric_name": basestring,
"combine_period": int,
"lambda": basestring,
"metric_names_binding": {
basestring: schema.Or(
"apache.net.kbytes_sec",
"apache.net.requests_sec",
"apache.performance.cpu_load_perc",
"cpu.idle_perc",
"cpu.stolen_perc",
"cpu.system_perc",
"cpu.total_logical_cores",
"cpu.user_perc",
"cpu.wait_perc",
"disk.allocation",
"disk.inode_used_perc",
"disk.space_used_perc",
"disk.total_space_mb",
"disk.total_used_space_mb",
"host_alive_status",
"io.read_kbytes_sec",
"io.read_req_sec",
"io.write_time_sec",
"kafka.consumer_lag",
"load.avg_1_min",
"load.avg_5_min",
"mem.free_mb",
"mem.swap_free_mb",
"mem.swap_total_mb",
"mem.total_mb",
"mem.usable_mb",
"mem.used_cache",
"metrics-added-to-batch-counter[0]",
"mysql.innodb.buffer_pool_free",
"mysql.innodb.buffer_pool_used",
"mysql.innodb.data_reads",
"mysql.innodb.mutex_spin_rounds",
"mysql.performance.com_delete_multi",
"mysql.performance.com_insert",
"mysql.performance.com_insert_select",
"mysql.performance.com_select",
"mysql.performance.com_update",
"mysql.performance.created_tmp_disk_tables",
"mysql.performance.created_tmp_files",
"mysql.performance.open_files",
"mysql.performance.questions",
"mysql.performance.user_time",
"net.in_bytes_sec",
"net.in_errors_sec",
"net.in_packets_dropped_sec",
"net.in_packets_sec",
"net.out_bytes_sec",
"net.out_errors_sec",
"net.out_packets_dropped_sec",
"net.out_packets_sec",
"nova.vm.disk.total_allocated_gb",
"process.pid_count",
"raw-sql.time.max",
"vcpus",
"vm.cpu.utilization_perc",
"vm.host_alive_status",
"vm.mem.total_mb",
"zookeeper.out_bytes",
"zookeeper.outstanding_bytes"
)
}
}
}).validate(_config)
# Checks the expression and the environment
handle = banana.validate_expression(_config["params"]["lambda"])
banana.validate_name_binding(handle,
_config["params"]["metric_names_binding"])
@staticmethod
def get_default_config():
return {
"module": MonascaCombineLDP.__name__,
"params": {
"metric_name": "cpu.logical_cores_actives",
"combine_period": 1,
"lambda": "a * b",
"metric_names_binding": {
"a": "cpu.idle_perc",
"b": "cpu.total_logical_cores"
}
}
}
@staticmethod
def select_func(_config):
return {
"prod": lambda m: m["metric"]["value"] / m["metric"]["count"],
"pow": lambda m: m["metric"]["max"],
}[_config["params"]["aggregation_function"]]

View File

@ -0,0 +1,146 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import schema
import monasca_analytics.ldp.base as bt
import monasca_analytics.ldp.monasca.helpers as helpers
import monasca_analytics.util.spark_func as fn
logger = logging.getLogger(__name__)
# FIXME: This code is inaccurate because values on "edge" of the RDD
# FIXME: have a computed derivative with less precision than others.
# FIXME: The base idea would be to use the sliding window capability
# FIXME: to compute the derivative with the unbiased variant for all
# FIXME: values. However, we need a way to "know" how many derivative
# FIXME: calculation values needs to be skipped from one window to the
# FIXME: other.
# FIXME:
class MonascaDerivativeLDP(bt.BaseLDP):
"""Monasca derivative live data processor"""
def __init__(self, _id, _config):
super(MonascaDerivativeLDP, self).__init__(_id, _config)
self._period = _config["params"]["derivative_period"]
@staticmethod
def validate_config(_config):
return schema.Schema({
"module": schema.And(basestring,
lambda i: not any(c.isspace() for c in i)),
"params": {
# Derivative period in multiple of batch interval
"derivative_period": int
}
})
@staticmethod
def get_default_config():
return {
"module": MonascaDerivativeLDP.__name__,
"params": {
"derivative_period": 1
}
}
def map_dstream(self, dstream):
"""
Map the given DStream into a new DStream where metrics
are replaced with their derivative.
:type dstream: pyspark.streaming.DStream
:param dstream: DStream
:return: Returns the stream of derivative.
"""
period = self._period
return dstream.map(fn.from_json) \
.window(period, period) \
.map(lambda m: ((frozenset(
m["metric"]["dimensions"].items()),
m["metric"]["name"]),
m)) \
.groupByKey() \
.flatMapValues(lambda metric: MonascaDerivativeLDP.derivative(
metric,
)) \
.map(lambda x: x[1])
@staticmethod
def derivative(metric_values):
"""
Compute the derivative of the given function.
:type metric_values: pyspark.resultiterable.ResultIterable[dict]
:param metric_values: The list of metric_values
:return: Returns the derivative of the provided metric.
"""
if len(metric_values) < 2:
return []
metric_name = metric_values.data[0]["metric"]["name"] + "_derivative"
meta = metric_values.data[0]["meta"]
dims = metric_values.data[0]["metric"]["dimensions"]
# All values
timestamps = map(lambda m: m["metric"]["timestamp"], metric_values)
all_values = map(lambda m: m["metric"]["value"], metric_values)
# Sort values
all_values = [y for (x, y) in sorted(zip(timestamps, all_values))]
timestamps = sorted(timestamps)
# Remove duplicates
last_timestamp = timestamps[0]
tmp_all_values = [all_values[0]]
tmp_timestamps = [last_timestamp]
for index in xrange(1, len(timestamps)):
if timestamps[index] == last_timestamp:
continue
else:
last_timestamp = timestamps[index]
tmp_all_values.append(all_values[index])
tmp_timestamps.append(last_timestamp)
all_values = tmp_all_values
timestamps = tmp_timestamps
if len(all_values) < 2:
return []
# Filter all values that have the same timestamp
n = len(all_values) - 1
new_values = [
float(all_values[1] - all_values[0]) /
float(timestamps[1] - timestamps[0])
]
for index in xrange(1, n):
new_values.append(
float(all_values[index + 1] - all_values[index - 1]) /
float(timestamps[index + 1] - timestamps[index - 1])
)
new_values.append(
float(all_values[n] - all_values[n - 1]) /
float(timestamps[n] - timestamps[n - 1])
)
new_metrics = [
helpers.create_agg_metric(
metric_name,
meta,
dims,
tmst,
val
) for val, tmst in zip(new_values, timestamps)
]
return new_metrics

View File

@ -0,0 +1,82 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os.path as path
import schema
import tempfile
import time
import monasca_analytics.sink.base as base
class FileSink(base.BaseSink):
"""Sink that prints the dstream to a file in the driver
This sink is for development **only**.
"""
def __init__(self, _id, _config):
super(FileSink, self).__init__(_id, _config)
if "params" in _config:
_path = path.expanduser(_config["params"]["path"])
if path.isdir(_path):
_path = path.join(_path, time.time() + '.log')
self._file_path = _path
else:
self._file_path = tempfile.NamedTemporaryFile().name
def sink_dstream(self, dstream):
"""
Sink the provided DStream into a file.
:type dstream: pyspark.streaming.DStream
:param dstream: DStream to sink
"""
_file_name = self._file_path
def write_output(rdd):
_file = open(_file_name, 'a+')
for rdd_entry in rdd.collect():
_file.write(json.dumps(rdd_entry, indent=4))
dstream.foreachRDD(lambda _, rdd: write_output(rdd))
def sink_ml(self, voter_id, matrix):
pass
@staticmethod
def get_default_config():
return {
"module": FileSink.__name__,
"params": {
"path": None
}
}
@staticmethod
def validate_config(_config):
return schema.Schema({
"module": schema.And(basestring,
lambda i: not any(c.isspace() for c in i)),
schema.Optional("params"): {
"path": schema.And(
basestring,
lambda i: path.exists(path.expanduser(i)) or
path.exists(path.dirname(path.expanduser(i)))
)
}
}).validate(_config)

View File

@ -23,7 +23,7 @@ class StdoutSink(base.BaseSink):
"""Sink that prints the dstream to stdout, using pprint command"""
def sink_dstream(self, dstream):
dstream.pprint()
dstream.pprint(1000)
def sink_ml(self, voter_id, matrix):
pass

View File

@ -76,7 +76,7 @@ class MarkovChainSource(base.BaseSource):
def _start_thread(self, system):
self._server = SocketServer.ThreadingTCPServer(
("", 0), # Let the OS pick a port for us
FMSTCPHandler, # Handler of the
FMSTCPHandler, # Handler of the requests
False)
self._server.allow_reuse_address = True
self._server.server_bind()
@ -128,19 +128,21 @@ class LeafNodes(object):
for s in self._state_nodes:
s.next_state(hour_of_day, ignored_states)
def collect_events(self, hour_of_day):
def collect_events(self, hour_of_day, fake_date, request):
"""Get list of events
:type hour_of_day: int
:param hour_of_day: An hour of the day that is used by
StateNode.collect_event
:type fake_date: datetime.datetime
:param fake_date: A date that you can use to generate a ctime.
:type request: RequestBuilder
:param request: Request object to send data.
:rtype: list
:returns: List of event. Specific to the event builder.
"""
events = []
for node in self._state_nodes:
events.extend(node.collect_events(hour_of_day))
return events
node.collect_events(hour_of_day, fake_date, request)
@six.add_metaclass(MetaId)
@ -204,23 +206,23 @@ class StateNode(object):
dep.next_state(hour_of_day, ignored_states)
self._markov_chain.apply_on(self, hour_of_day)
def collect_events(self, hour_of_day):
def collect_events(self, hour_of_day, fake_date, request):
"""Collect event triggered for the next burst.
:type hour_of_day: int
:param hour_of_day: an integer in the range of 0 to 24 to
express the hour of the day.
:param hour_of_day: an integer in the range of 0 to 24 to express
he hour of the day.
:type fake_date: datetime.datetime
:param fake_date: A date that you can use to generate a ctime.
:type request: RequestBuilder
:param request: Request builder to send specific events
:rtype: list
:returns: events for this step or None
"""
events = []
for trigger in self._triggers:
event = trigger.apply_on(self, hour_of_day)
if event is not None:
events.append(event)
trigger.apply_on(self, hour_of_day, fake_date, request)
for dep in self.dependencies:
events.extend(dep.collect_events(hour_of_day))
return events
dep.collect_events(hour_of_day, fake_date, request)
class FMSTCPHandler(SocketServer.BaseRequestHandler):
@ -233,13 +235,11 @@ class FMSTCPHandler(SocketServer.BaseRequestHandler):
hour_of_day = fake_date.hour
while not self.server.terminate:
self.server.system.next_state(hour_of_day)
events = self.server.system.collect_events(hour_of_day)
request = RequestBuilder(self.request)
self.server.system.collect_events(hour_of_day, fake_date, request)
try:
self.request.send("{0}\n".format(json.dumps({
'ctime': fake_date.ctime(),
'events': events
}, cls=DictEncoder)))
request.finalize()
except IOError:
logger.debug("Source is now off")
self.server.terminate = True
@ -252,6 +252,27 @@ class FMSTCPHandler(SocketServer.BaseRequestHandler):
hour_of_day = 0
class RequestBuilder(object):
def __init__(self, request):
self._request = request
self._collected_data = []
def send(self, data):
"""
Send an object over the network.
:param data: Object to send.
"""
self._collected_data.append(data)
def finalize(self):
for data in self._collected_data:
self._request.send("{0}\n".format(json.dumps(data,
cls=DictEncoder)))
self._request = None
self._collected_data = None
class DictEncoder(json.JSONEncoder):
def default(self, o):

View File

@ -33,7 +33,9 @@ class Trigger(object):
probability of success
:type event_builder:
(monasca_analytics.source.markov_chain.base.StateNode) -> Event
(monasca_analytics.source.markov_chain.base.StateNode,
datetime.datetime,
monasca_analytics.source.markov_chain.base.RequestBuilder) -> None
:param event_builder: Event builder that receive the node and use
the state to return an event.
"""
@ -41,7 +43,7 @@ class Trigger(object):
self._node_check = node_check
self._event_builder = event_builder
def apply_on(self, node, hour_of_day):
def apply_on(self, node, hour_of_day, fake_date, request):
"""Apply this trigger on the given node.
:type node: monasca_analytics.source.markov_chain.base.StateNode
@ -49,10 +51,14 @@ class Trigger(object):
:type hour_of_day: int
:param hour_of_day: An integer between [0, 24) representing
the hour of the day.
:type fake_date: datetime.datetime
:param fake_date: A date that you can use to generate a ctime.
:type request:
monasca_analytics.source.markov_chain.base.RequestBuilder
:param request: Request builder to send events
"""
if self._prob_check(hour_of_day) and self._node_check(node):
return self._event_builder(node)
return None
self._event_builder(node, fake_date, request)
class Event(object):
@ -77,9 +83,16 @@ class EventBuilder(object):
"""
self._msg = msg
def __call__(self, node):
def __call__(self, node, fake_date, request):
"""
:type node: monasca_analytics.source.markov_chain.base.StateNode
:param node: The node associated with the event.
:type fake_date: datetime.datetime
:param fake_date: A date that you can use to generate a ctime.
:type request:
monasca_analytics.source.markov_chain.base.RequestBuilder
"""
return Event(self._msg, str(node.id()))
request.send({
'ctime': fake_date.ctime(),
'event': Event(self._msg, str(node.id()))
})

View File

@ -0,0 +1,134 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import random
import schema
import monasca_analytics.source.markov_chain.base as base
import monasca_analytics.source.markov_chain.events as ev
import monasca_analytics.source.markov_chain.prob_checks as pck
import monasca_analytics.source.markov_chain.state_check as dck
import monasca_analytics.source.markov_chain.transition as tr
import monasca_analytics.util.timestamp as tp
logger = logging.getLogger(__name__)
class MonascaMarkovChainSource(base.MarkovChainSource):
@staticmethod
def validate_config(_config):
return schema.Schema({
"module": schema.And(basestring,
lambda i: not any(c.isspace() for c in i)),
"params": {
"server_sleep_in_seconds": schema.And(float,
lambda v: 0 < v < 1)
},
}).validate(_config)
@staticmethod
def get_default_config():
return {
"module": MonascaMarkovChainSource.__name__,
"params": {
"server_sleep_in_seconds": 0.01
}
}
def get_feature_list(self):
return ["vm1", "vm2", "host1", "host2"]
def _create_system(self):
mc = tr.MarkovChain([])
vm_triggers = [
ev.Trigger(
event_builder=MonascaFakeMetricBuilder("vm.mem.used_mb"),
node_check=dck.TrueCheck(),
prob_check=pck.NoProbCheck()
),
ev.Trigger(
event_builder=MonascaFakeMetricBuilder("cpu.idle_perc"),
node_check=dck.TrueCheck(),
prob_check=pck.NoProbCheck()
),
ev.Trigger(
event_builder=MonascaFakeMetricBuilder(
"cpu.total_logical_cores"),
node_check=dck.TrueCheck(),
prob_check=pck.NoProbCheck()
)
]
host_trigger = ev.Trigger(
event_builder=MonascaFakeMetricBuilder("mem.total_mb"),
node_check=dck.TrueCheck(),
prob_check=pck.NoProbCheck()
)
return [
# vm.mem.used_mb
base.StateNode(3, mc, vm_triggers[0], _id="vm1"),
base.StateNode(1, mc, vm_triggers[0], _id="vm2"),
# cpu.idle_perc
base.StateNode(0.75, mc, vm_triggers[1], _id="vm1"),
base.StateNode(0.75, mc, vm_triggers[1], _id="vm2"),
# cpu.total_logical_cores
base.StateNode(3, mc, vm_triggers[2], _id="vm1"),
base.StateNode(2, mc, vm_triggers[2], _id="vm2"),
# mem.total_mb
base.StateNode(5, mc, host_trigger, _id="host1"),
base.StateNode(6, mc, host_trigger, _id="host2"),
]
class MonascaFakeMetricBuilder(object):
def __init__(self, metric_name):
"""
:type metric_name: str
:param metric_name: The name of the metric
"""
self.metric_name = metric_name
def __call__(self, node, fake_date, request):
"""
:type node: monasca_analytics.source.markov_chain.base.StateNode
:param node: The node associated with the event.
:type fake_date: datetime.datetime
:param fake_date: A date that you can use to generate a ctime.
:type request:
monasca_analytics.source.markov_chain.base.RequestBuilder
"""
half_hour = 60 * 60 / 2
request.send({
"metric": {
"name": self.metric_name,
"dimensions": {
"service": "monitoring",
"hostname": node.id()
},
"timestamp": tp.timestamp(fake_date) +
random.randint(- half_hour, half_hour),
"value": node.state
},
"meta": {
"tenantId": 0,
"region": "earth"
},
"creation_time": 0
})

View File

@ -17,16 +17,15 @@
import abc
import json
import logging
# TODO(David): Recursive import => File needs to be renamed
import numpy as np
import random
import schema
import six
import SocketServer
import threading as th
import time
import uuid
import numpy as np
import schema
import six
import monasca_analytics.exception.monanas as err
from monasca_analytics.source import base

View File

@ -17,7 +17,6 @@
import logging
import pyspark
from pyspark import streaming
import monasca_analytics.config.config as config
import monasca_analytics.ingestor.base as bi
@ -25,6 +24,7 @@ import monasca_analytics.ldp.base as mldp
import monasca_analytics.sink.base as msink
import monasca_analytics.sml.base as bml
import monasca_analytics.spark.aggregator as agg
import monasca_analytics.spark.streaming_context as streamingctx
import monasca_analytics.voter.base as mvoter
logger = logging.getLogger(__name__)
@ -44,14 +44,14 @@ class DriverExecutor(object):
self._orchestrator = agg.Aggregator(self)
def restart_spark():
self._ssc = streaming.StreamingContext(self._sc, _config[
"spark_config"]["streaming"]["batch_interval"])
self._ssc = streamingctx.create_streaming_context(
self._sc,
_config)
self._restart_spark = restart_spark
self._sc = pyspark.SparkContext(
appName=_config["spark_config"]["appName"])
self._ssc = streaming.StreamingContext(self._sc, _config[
"spark_config"]["streaming"]["batch_interval"])
self._ssc = streamingctx.create_streaming_context(self._sc, _config)
logger.debug("Propagating feature list...")
self._propagate_feature_list()
@ -59,14 +59,23 @@ class DriverExecutor(object):
"""Start the pipeline"""
# Start by connecting the source
self._prepare_phase(self._connect_dependents_phase1)
if self._phase1_required():
logger.info("Phase 1 required, ldp won't produce data until"
" smls have finished.")
# Connect sources to ingestors
self._prepare_phase(self._connect_dependents_phase1)
# Preparation step for the orchestrator:
# Accumulate everything from the sources
self._orchestrator.prepare_final_accumulate_stream_step()
# Preparation step for the orchestrator:
# Accumulate everything from the sources
self._orchestrator.prepare_final_accumulate_stream_step()
# Then prepare the orchestrator
self._prepare_orchestrator()
# Then prepare the orchestrator
self._prepare_orchestrator()
else:
# Connect sources to ldps
logger.info("Phase 1 was not required, skipping it.")
self._prepare_phase(self._connect_dependents_phase2)
logger.info("Start the streaming context")
self._ssc.start()
@ -88,12 +97,21 @@ class DriverExecutor(object):
logger.debug("Phase 2: Create new connections")
self._prepare_phase(self._connect_dependents_phase2)
self._ssc.start()
# ?
self._ssc.awaitTermination()
def _terminate_sources(self):
"""Terminates the sources."""
for source in self._sources:
source.terminate_source()
def _phase1_required(self):
for src in self._sources:
if any(isinstance(el, bi.BaseIngestor) for el in self._links[src]):
return True
return False
def _prepare_orchestrator(self):
"""
This is a part of phase 1. The orchestrator collects
@ -137,14 +155,14 @@ class DriverExecutor(object):
# SML can, for now, only be connected to voter.
if isinstance(connected_node, mvoter.BaseVoter) and \
isinstance(from_component, bml.BaseSML):
isinstance(from_component, bml.BaseSML):
logger.debug("Set {} to {}"
.format(connected_node, from_component))
from_component.set_voter(connected_node)
# Voter can only be connected to LDPs
if isinstance(from_component, mvoter.BaseVoter) and \
isinstance(connected_node, mldp.BaseLDP):
isinstance(connected_node, mldp.BaseLDP):
logger.debug("Append {} to {}"
.format(connected_node, from_component))
from_component.append_ldp(connected_node)
@ -178,11 +196,15 @@ class DriverExecutor(object):
# Live data processors are also doing a map, they add
# the causality bit to each element in the stream.
if isinstance(connected_node, mldp.BaseLDP):
logger.debug("Connecting {} to {}".format(from_component,
connected_node))
new_dstream = connected_node.map_dstream(dstream)
self._connect_dependents_phase2(new_dstream, connected_node)
# Sink are at the end of the branch!
if isinstance(connected_node, msink.BaseSink):
logger.debug("Sink {} into {}".format(from_component,
connected_node))
connected_node.sink_dstream(dstream)
def _connect_dependents_phase1(self, dstream, from_component):

View File

@ -0,0 +1,98 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os.path as os_path
import pyspark.sql as sql
import pyspark.streaming as streaming
logger = logging.getLogger(__name__)
class DriverStreamingListener(streaming.StreamingListener):
@staticmethod
def onBatchCompleted(batchCompleted):
logger.debug("Batch Completed: \n\t{}\n".format(
batchCompleted))
@staticmethod
def onBatchStarted(batchStarted):
logger.debug("Batch Started: \n\t{}\n".format(
batchStarted))
@staticmethod
def onBatchSubmitted(batchSubmitted):
logger.debug("Batch submitted: \n\t{}\n".format(
batchSubmitted))
@staticmethod
def onOutputOperationCompleted(outputOperationCompleted):
logger.debug("Job of batch has completed: \n\t{}\n".format(
outputOperationCompleted))
@staticmethod
def onOutputOperationStarted(outputOperationStarted):
logger.debug("Job of a batch has started: \n\t{}\n".format(
outputOperationStarted))
@staticmethod
def onReceiverError(receiverError):
logger.warn("Receiver has reported an error: \n\t{}\n".format(
receiverError))
@staticmethod
def onReceiverStarted(receiverStarted):
logger.debug("Receiver has been started: \n\t{}\n".format(
receiverStarted))
@staticmethod
def onReceiverStopped(receiverStopped):
logger.debug("Receiver has stopped: \n\t{}\n".format(
receiverStopped))
def create_streaming_context(spark_context, config):
"""
Create a streaming context with a custom Streaming Listener
that will log every event.
:param spark_context: Spark context
:type spark_context: pyspark.SparkContext
:param config: dict
:return: Returns a new streaming context from the given context.
:rtype: pyspark.streaming.StreamingContext
"""
ssc = streaming.StreamingContext(spark_context, config[
"spark_config"]["streaming"]["batch_interval"])
ssc.addStreamingListener(DriverStreamingListener)
directory = os_path.expanduser("~/checkpointing")
logger.info("Checkpointing to `{}`".format(directory))
ssc.checkpoint(directory)
return ssc
def get_sqlcontext_instance(spark_context):
"""
:type spark_context: pyspark.SparkContext
:param spark_context: The currently active Spark Context
:return: Returns the SQLContext
:rtype: sql.SQLContext
"""
if 'sqlContextSingletonInstance' not in globals():
globals()['sqlContextSingletonInstance'] = sql.SQLContext(
spark_context)
return globals()['sqlContextSingletonInstance']

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
def from_json(rdd_entry):
return json.loads(rdd_entry)

View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
def timestamp(date_time_object):
"""
Returns the timestamp associated with the given
datetime object.
:type date_time_object: datetime.datetime
:param date_time_object: datetime object that will be converted
:rtype int
:return: Returns the appropriate timestamp
"""
return int(time.mktime(date_time_object.timetuple()))

View File

@ -2,4 +2,4 @@
hacking>=0.10.2
flake8>=2.2.4
nose==1.3.0
mock>=1.0.1
mock>=1.0.1

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -16,7 +16,7 @@
import copy
import json
import logging
import logging.config
import os
import unittest

View File

@ -16,7 +16,7 @@
import copy
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest
@ -37,17 +37,19 @@ class TestIptablesIngestor(unittest.TestCase):
def setUp(self):
self.setup_logging()
self.rdd_entry = {
self.rdd_entry = [{
"ctime": "Mon Apr 11 19:59:12 2016",
"events": [
{
"msg": "OUTPUT -p icmp --icmp-type echo-request -j ACCEPT",
"id": "1"},
{
"msg": "OUTPUT -o eth0 -p tcp --sport 80 -j ACCEPT",
"id": "1"}
]
}
"event": {
"msg": "OUTPUT -p icmp --icmp-type echo-request -j ACCEPT",
"id": "1"
}
}, {
"ctime": "Mon Apr 11 19:59:12 2016",
"event": {
"msg": "OUTPUT -o eth0 -p tcp --sport 80 -j ACCEPT",
"id": "1"
}
}]
self.ip_ing = ipt_ing.IptablesIngestor("fake_id",
{"module": "fake_config"})
self.ip_ing.set_feature_list(["ssh", "ip", "http", "ping"])
@ -61,10 +63,15 @@ class TestIptablesIngestor(unittest.TestCase):
self.assertEqual("IptablesIngestor", default_config["module"])
def test_process_data(self):
rdd_str = '{"ctime": "Mon Apr 11 19:59:12 2016","events": ['
rdd_entry = []
for iptable in ipt_src.iptables:
rdd_str += '{"msg": "' + iptable + '","id": "1"}, '
rdd_str = rdd_str[:-2] + ']}'
processed = self.ip_ing._process_data(rdd_str, self.ip_ing._features)
rdd_entry.append({
'ctime': "Mon Apr 11 19:59:12 2016",
'event': {
"msg": iptable,
"id": 1
}
})
processed = self.ip_ing._process_data(rdd_entry, self.ip_ing._features)
np.testing.assert_array_equal(
processed, np.array([2, 4, 2, 4]))

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest
@ -35,17 +35,20 @@ class TestIptablesLDP(unittest.TestCase):
def setUp(self):
self.setup_logging()
self.rdd_entry = {
self.rdd_entry = [{
"ctime": "Mon Apr 11 19:59:12 2016",
"events": [
{
"msg": "OUTPUT -p icmp --icmp-type echo-request -j ACCEPT",
"id": "1"},
{
"msg": "OUTPUT -o eth0 -p tcp --sport 80 -j ACCEPT",
"id": "1"}
]
}
"event": {
"msg": "OUTPUT -p icmp --icmp-type echo-request -j ACCEPT",
"id": "1"
}
}, {
"ctime": "Mon Apr 11 19:59:12 2016",
"event": {
"msg": "OUTPUT -o eth0 -p tcp --sport 80 -j ACCEPT",
"id": "1"
}
}]
self.raw_events = map(lambda x: x["event"], self.rdd_entry)
self.ip_ldp = iptables_ldp.IptablesLDP("fake_id",
{"module": "fake_config"})
@ -53,31 +56,30 @@ class TestIptablesLDP(unittest.TestCase):
pass
def assert_anomalous_events(self, events, anomalous=True):
expected_events = self.rdd_entry["events"]
expected_events = self.raw_events
for exv in expected_events:
exv["ctime"] = self.rdd_entry["ctime"]
exv["anomalous"] = anomalous
self.assertEqual(expected_events, events)
def test_detect_anomalies_no_features(self):
self.clf = classifier_mock.MockClassifier(False)
self.ip_ldp.set_voter_output(self.clf)
ret = self.ip_ldp._detect_anomalies(json.dumps(self.rdd_entry),
ret = self.ip_ldp._detect_anomalies(self.rdd_entry,
self.ip_ldp._data)
self.assertEqual(self.rdd_entry["events"], ret)
self.assertEqual(self.raw_events, ret)
def test_detect_anomalies_no_classifier(self):
self.clf = classifier_mock.MockClassifier(False)
self.ip_ldp.set_feature_list(["ssh", "ip", "http", "ping"])
ret = self.ip_ldp._detect_anomalies(json.dumps(self.rdd_entry),
ret = self.ip_ldp._detect_anomalies(self.rdd_entry,
self.ip_ldp._data)
self.assertEqual(self.rdd_entry["events"], ret)
self.assertEqual(self.raw_events, ret)
def test_detect_anomalies_non_anomalous(self):
self.clf = classifier_mock.MockClassifier(False)
self.ip_ldp.set_feature_list(["ssh", "ip", "http", "ping"])
self.ip_ldp.set_voter_output(self.clf)
ret = self.ip_ldp._detect_anomalies(json.dumps(self.rdd_entry),
ret = self.ip_ldp._detect_anomalies(self.rdd_entry,
self.ip_ldp._data)
self.assert_anomalous_events(ret, False)
@ -85,6 +87,6 @@ class TestIptablesLDP(unittest.TestCase):
self.clf = classifier_mock.MockClassifier(True)
self.ip_ldp.set_feature_list(["ssh", "ip", "http", "ping"])
self.ip_ldp.set_voter_output(self.clf)
ret = self.ip_ldp._detect_anomalies(json.dumps(self.rdd_entry),
ret = self.ip_ldp._detect_anomalies(self.rdd_entry,
self.ip_ldp._data)
self.assert_anomalous_events(ret, True)

31
test/mocks/markov.py Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python
# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import monasca_analytics.source.markov_chain.base as bmkv
class MockRequestBuilder(bmkv.RequestBuilder):
def __init__(self, events):
super(MockRequestBuilder, self).__init__(request=None)
self.events = events
def send(self, data):
self.events.append(data)
def finalize(self):
pass

View File

@ -58,6 +58,12 @@ class MockStreamingContext(object):
def mockDStream(self):
return MockDStream(None, None, None)
def addStreamingListener(self, *arg, **kwargs):
pass
def checkpoint(self, directory):
pass
class MockKafkaUtils(object):

View File

@ -14,8 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import json
import logging
import logging.config
import os
import unittest
@ -24,6 +25,7 @@ import monasca_analytics.source.markov_chain.events as ev
import monasca_analytics.source.markov_chain.prob_checks as pck
import monasca_analytics.source.markov_chain.state_check as dck
import monasca_analytics.source.markov_chain.transition as tr
import test.mocks.markov as markov_mocks
class StateNodeTest(unittest.TestCase):
@ -49,9 +51,11 @@ class StateNodeTest(unittest.TestCase):
event_builder=ev.EventBuilder("test")
)
node = base.StateNode(0, None, some_trigger)
events = node.collect_events(1)
events = []
node.collect_events(1, datetime.datetime.now(),
markov_mocks.MockRequestBuilder(events))
self.assertTrue(len(events) == 1)
self.assertEqual(events[0].msg, "test", "a")
self.assertEqual(events[0]["event"].msg, "test", "a")
def test_next_state_should_use_available_transitions(self):
tr1 = tr.Transition(

View File

@ -14,14 +14,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import json
import logging
import logging.config
import os
import unittest
import monasca_analytics.source.markov_chain.events as ev
import monasca_analytics.source.markov_chain.prob_checks as pck
import monasca_analytics.source.markov_chain.state_check as dck
import test.mocks.markov as markov_mocks
class DummyState(object):
@ -56,8 +58,20 @@ class TriggersTest(unittest.TestCase):
prob_check=pck.NoProbCheck(),
event_builder=ev.EventBuilder("")
)
self.assertIsNotNone(some_trigger.apply_on(DummyState(), 1))
self.assertIsNone(some_trigger.apply_on(DummyState(1), 1))
events = []
some_trigger.apply_on(
DummyState(),
1,
datetime.datetime.now(),
markov_mocks.MockRequestBuilder(events))
self.assertEqual(len(events), 1)
events = []
some_trigger.apply_on(
DummyState(1),
1,
datetime.datetime.now(),
markov_mocks.MockRequestBuilder(events))
self.assertEqual(len(events), 0)
if __name__ == "__main__":

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,11 +15,11 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest
from monasca_analytics.source import random
from monasca_analytics.source import randoms
class TestRandomSource(unittest.TestCase):
@ -39,6 +39,6 @@ class TestRandomSource(unittest.TestCase):
pass
def test_get_default_config(self):
default_config = random.RandomSource.get_default_config()
random.RandomSource.validate_config(default_config)
default_config = randoms.RandomSource.get_default_config()
randoms.RandomSource.validate_config(default_config)
self.assertEqual("RandomSource", default_config["module"])

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -57,19 +57,22 @@ class DriverExecutorTest(unittest.TestCase):
def _backup_functions(self):
self.original_get_class_by_name = cu.get_class_by_name
self.original_SparkContext = driver.pyspark.SparkContext
self.original_StreamingContext = driver.streaming.StreamingContext
self.original_StreamingContext = \
driver.streamingctx.streaming.StreamingContext
self.original_Aggregator = driver.agg.Aggregator
def _restore_functions(self):
cu.get_class_by_name = self.original_get_class_by_name
driver.pyspark.SparkContext = self.original_SparkContext
driver.streaming.StreamingContext = self.original_StreamingContext
driver.streamingctx.streaming.StreamingContext = \
self.original_StreamingContext
driver.agg.Aggregator = self.original_Aggregator
def _mock_functions(self):
cu.get_class_by_name = sml_mocks.mock_get_class_by_name
driver.pyspark.SparkContext = spark_mocks.MockSparkContext
driver.streaming.StreamingContext = spark_mocks.MockStreamingContext
driver.streamingctx.streaming.StreamingContext = \
spark_mocks.MockStreamingContext
driver.agg.Aggregator = sml_mocks.MockClass_aggr_module
def init_sml_config(self):

View File

@ -58,21 +58,24 @@ class MonanasTest(unittest.TestCase):
self.original_kill = mnn.os.kill
self.original_get_class_by_name = cu.get_class_by_name
self.original_SparkContext = driver.pyspark.SparkContext
self.original_StreamingContext = driver.streaming.StreamingContext
self.original_StreamingContext = \
driver.streamingctx.streaming.StreamingContext
self.original_Aggregator = driver.agg.Aggregator
def _restore_functions(self):
cu.get_class_by_name = self.original_get_class_by_name
mnn.os.kill = self.original_kill
driver.pyspark.SparkContext = self.original_SparkContext
driver.streaming.StreamingContext = self.original_StreamingContext
driver.streamingctx.streaming.StreamingContext = \
self.original_StreamingContext
driver.agg.Aggregator = self.original_Aggregator
def _mock_functions(self):
cu.get_class_by_name = sml_mocks.mock_get_class_by_name
mnn.os.kill = sml_mocks.mock_kill
driver.pyspark.SparkContext = spark_mocks.MockSparkContext
driver.streaming.StreamingContext = spark_mocks.MockStreamingContext
driver.streamingctx.streaming.StreamingContext = \
spark_mocks.MockStreamingContext
driver.agg.Aggregator = sml_mocks.MockClass_aggr_module
def init_sml_config(self):

View File

@ -91,7 +91,8 @@ class CommonUtilTest(unittest.TestCase):
names = common_util.get_available_source_class_names()
self.assertItemsEqual(
['RandomSource', 'KafkaSource',
'CloudMarkovChainSource', 'IPTablesSource'],
'CloudMarkovChainSource', 'IPTablesSource',
'MonascaMarkovChainSource'],
names)
def test_get_available_ingestor_class_names(self):
@ -121,5 +122,10 @@ class CommonUtilTest(unittest.TestCase):
self.assertItemsEqual(["PickIndexVoter"], names)
def test_get_available_ldp_class_names(self):
return
names = common_util.get_available_ldp_class_names()
self.assertItemsEqual(["CloudCausalityLDP", "IptablesLDP"], names)
self.assertItemsEqual([
"CloudCausalityLDP", "IptablesLDP",
'MonascaDerivativeLDP', 'MonascaAggregateLDP',
'MonascaCombineLDP'
], names)

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest

View File

@ -15,7 +15,7 @@
# under the License.
import json
import logging
import logging.config
import os
import unittest