adding machine learning service

at this first step, the service includes one
plugin,this plugin use jaccard correlation
in order to measure correlation between a
pair of alarms.
once in configured amount of alarms this
service generates a report and saves it to
configured directory.

Change-Id: I2428dcf05365e6b68fb0e86ccd82554c82b94c2f
This commit is contained in:
Anna 2017-08-06 10:49:14 +00:00
parent 01297e9288
commit 0b506a745d
21 changed files with 1134 additions and 16 deletions

View File

@ -291,6 +291,7 @@ function start_vitrage {
run_process vitrage-collector "$VITRAGE_BIN_DIR/vitrage-collector --config-file $VITRAGE_CONF"
run_process vitrage-graph "$VITRAGE_BIN_DIR/vitrage-graph --config-file $VITRAGE_CONF"
run_process vitrage-notifier "$VITRAGE_BIN_DIR/vitrage-notifier --config-file $VITRAGE_CONF"
run_process vitrage-ml "$VITRAGE_BIN_DIR/vitrage-ml --config-file $VITRAGE_CONF"
}
# stop_vitrage() - Stop running processes

View File

@ -7,6 +7,8 @@ enable_service vitrage-graph
enable_service vitrage-notifier
# Collector
enable_service vitrage-collector
# machine_learning
enable_service vitrage-ml
# Default directories

View File

@ -0,0 +1,8 @@
---
features:
- A new service was added to Vitrage, the machine learning service.
Together with it, the first machine learning plugin was added - the
Jaccard Correlation plugin. This plugin listens to rabbit MQ
receiving messages about creation and deletion of alarms, and
learns the correlation between each pair of alarms, in order to
recommend on creation of new templates in the future.

View File

@ -29,6 +29,7 @@ console_scripts =
vitrage-graph = vitrage.cli.graph:main
vitrage-notifier = vitrage.cli.notifier:main
vitrage-collector = vitrage.cli.collector:main
vitrage-ml = vitrage.cli.machine_learning:main
vitrage.entity_graph =
networkx = vitrage.graph.driver.networkx_graph:NXGraph

View File

@ -0,0 +1,30 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_service import service as os_service
import sys
from vitrage.machine_learning.service import MachineLearningService
from vitrage import service
def main():
conf = service.prepare_service()
launcher = os_service.ServiceLauncher(conf)
launcher.launch_service(MachineLearningService(conf))
launcher.wait()
if __name__ == "__main__":
sys.exit(main())

View File

@ -112,6 +112,8 @@ class GraphAction(object):
class NotifierEventTypes(object):
ACTIVATE_DEDUCED_ALARM_EVENT = 'vitrage.deduced_alarm.activate'
DEACTIVATE_DEDUCED_ALARM_EVENT = 'vitrage.deduced_alarm.deactivate'
ACTIVATE_ALARM_EVENT = 'vitrage.alarm.activate'
DEACTIVATE_ALARM_EVENT = 'vitrage.alarm.deactivate'
ACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.activate'
DEACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.deactivate'
EXECUTE_EXTERNAL_ACTION = 'vitrage.execute_external_action'

View File

@ -28,25 +28,42 @@ class GraphNotifier(object):
"""Allows writing to message bus"""
def __init__(self, conf):
self.oslo_notifier = None
try:
topic = conf.entity_graph.notifier_topic
notifier_plugins = conf.notifiers
if not topic or not notifier_plugins:
topics = self._get_topics(conf)
if not topics:
LOG.info('Graph Notifier is disabled')
return
self.oslo_notifier = oslo_messaging.Notifier(
get_transport(conf),
driver='messagingv2',
publisher_id='vitrage.graph',
topics=[topic])
except Exception as e:
LOG.info('Graph Notifier - missing configuration %s' % str(e))
topics=topics)
@property
def enabled(self):
return self.oslo_notifier is not None
def _get_topics(self, conf):
topics = []
try:
notifier_topic = conf.entity_graph.notifier_topic
notifier_plugins = conf.notifiers
if notifier_topic and notifier_plugins:
topics.append(notifier_topic)
except Exception as e:
LOG.info('Graph Notifier - missing configuration %s' % str(e))
try:
machine_learning_topic = \
conf.machine_learning.machine_learning_topic
machine_learning_plugins = conf.machine_learning.plugins
if machine_learning_topic and machine_learning_plugins:
topics.append(machine_learning_topic)
except Exception as e:
LOG.info('Machine Learning - missing configuration %s' % str(e))
return topics
def notify_when_applicable(self, before, current, is_vertex, graph):
"""Callback subscribed to driver.graph updates
@ -64,9 +81,9 @@ class GraphNotifier(object):
# in case the vertex point to some resource add the resource to the
# notification (useful for deduce alarm notifications)
if current.get(VProps.RESOURCE_ID):
if current.get(VProps.VITRAGE_RESOURCE_ID):
current.properties[VProps.RESOURCE] = graph.get_vertex(
current.get(VProps.RESOURCE_ID))
current.get(VProps.VITRAGE_RESOURCE_ID))
LOG.info('notification_types : %s', str(notification_types))
LOG.info('notification properties : %s', current.properties)
@ -99,6 +116,9 @@ def _get_notification_type(before, current, is_vertex):
notification_type(_is_active_deduced_alarm,
NotifierEventTypes.ACTIVATE_DEDUCED_ALARM_EVENT,
NotifierEventTypes.DEACTIVATE_DEDUCED_ALARM_EVENT),
notification_type(_is_active_alarm,
NotifierEventTypes.ACTIVATE_ALARM_EVENT,
NotifierEventTypes.DEACTIVATE_ALARM_EVENT),
notification_type(_is_marked_down,
NotifierEventTypes.ACTIVATE_MARK_DOWN_EVENT,
NotifierEventTypes.DEACTIVATE_MARK_DOWN_EVENT),
@ -115,6 +135,12 @@ def _is_active_deduced_alarm(vertex):
return False
def _is_active_alarm(vertex):
if vertex and vertex.get(VProps.VITRAGE_CATEGORY) == EntityCategory.ALARM:
return _is_relevant_vertex(vertex)
return False
def _is_marked_down(vertex):
if not vertex:
return False

View File

@ -0,0 +1,28 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
OPTS = [
cfg.ListOpt('plugins',
help='Names of enabled machine learning plugins '
'(example jaccard_correlation)'),
cfg.ListOpt('plugins_path',
default=['vitrage.machine_learning.plugins'],
help='list of base path for notifiers'),
cfg.StrOpt('machine_learning_topic',
default='vitrage.machine_learning',
help='The topic that vitrage-graph uses for graph '
'machine learning messages.'),
]

View File

@ -0,0 +1,15 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'stack'

View File

@ -0,0 +1,32 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class MachineLearningBase(object):
def __init__(self, conf):
self.conf = conf
@abc.abstractmethod
def process_event(self, data, event_type):
pass
@staticmethod
@abc.abstractmethod
def get_plugin_name():
pass

View File

@ -0,0 +1,34 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
OPTS = [
cfg.StrOpt('plugin_path',
default='vitrage.machine_learning.plugins.'
'jaccard_correlation.alarm_processor.'
'AlarmDataProcessor',
help='jaccard_correlation class path',
required=True),
cfg.IntOpt('num_of_events_to_flush', default=1000,
help='the amount of events flushes'),
cfg.StrOpt('output_folder', default='/tmp',
help='folder to write all reports to'),
cfg.FloatOpt('correlation_threshold', default=0,
help='threshold of interesting correlations'),
cfg.FloatOpt('high_corr_score', default=0.9,
help='high correlation lower limit'),
cfg.FloatOpt('med_corr_score', default=0.5,
help='medium correlation lower limit'),
]

View File

@ -0,0 +1,59 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from oslo_log import log
import pickle
LOG = log.getLogger(__name__)
AccumulatedData = namedtuple('AccumulatedData', ['activity', 'intersection'])
ACTIVITY_PATH = "/tmp/alarms_activity.txt"
INTERSECT_PATH = "/tmp/alarms_intersections.txt"
def load_data():
try:
with open(ACTIVITY_PATH, 'rb') as activity_f:
alarms_activity = pickle.load(activity_f)
except Exception as e:
LOG.info('Cannot load alarms_activity - %s', e)
return AccumulatedData({}, {})
try:
with open(INTERSECT_PATH, 'rb') as intersect_f:
alarms_intersect = pickle.load(intersect_f)
except Exception as e:
LOG.info('Cannot load alarms_intersect - %s', e)
return AccumulatedData({}, {})
return AccumulatedData(alarms_activity, alarms_intersect)
def save_accumulated_data(data_manager):
activity = data_manager.alarms_activity
intersects = data_manager.alarms_intersects
try:
with open(ACTIVITY_PATH, 'wb') as activity_f:
pickle.dump(activity, activity_f)
except Exception as e:
LOG.exception('Cannot save alarms_intersect - %s', e)
try:
with open(INTERSECT_PATH, 'wb') as intersect_f:
pickle.dump(intersects, intersect_f)
except Exception as e:
LOG.exception('Cannot save alarms_intersect - %s', e)

View File

@ -0,0 +1,80 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_log import log
LOG = log.getLogger(__name__)
class AlarmDataAccumulator(object):
def __init__(self, accumulated_data):
self.active_start_times = {}
self.alarms_activity = accumulated_data.activity
# TODO(annarez): exclude intersections between deduced and it's cause
self.alarms_intersects = accumulated_data.intersection
def append_active(self, alarm_id, timestamp):
if alarm_id in self.active_start_times:
LOG.debug("Active alarm {} was started twice. Second time at {}".
format(alarm_id, str(timestamp)))
return
self.active_start_times[alarm_id] = timestamp
def append_inactive(self, alarm_id, end_time):
if alarm_id not in self.active_start_times:
LOG.debug("Alarm {} at {} was deactivated without being active".
format(alarm_id, str(end_time)))
return
alarm_duration = end_time - self.active_start_times[alarm_id]
self.alarms_activity[alarm_id] = alarm_duration + \
self.alarms_activity.get(alarm_id, datetime.timedelta(0))
start_time = self.active_start_times[alarm_id]
del self.active_start_times[alarm_id]
self.append_intersect(alarm_id, start_time, end_time)
def append_intersect(self, alarm_id, start_time, end_time):
for active_alarm in self.active_start_times.items():
key = frozenset([alarm_id, active_alarm[0]])
active_alarm_service = active_alarm[1]
self.alarms_intersects[key] = \
self.alarms_intersects.get(key, datetime.timedelta(0)) + \
self.calc_intersect(start_time, end_time, active_alarm_service)
def calc_intersect(self, inactive_start, inactive_end, active_start):
return inactive_end - max(inactive_start, active_start)
def flush_accumulations(self):
"""flush all active alarms
empty the data from active_start_times and re-enter all the
currently-active alarms, as if they started now
"""
now = datetime.datetime.now()
active_alarms = list(self.active_start_times)
for active_alarm in active_alarms:
self.append_inactive(active_alarm, now)
for flushed_active_alarm in active_alarms:
self.append_active(flushed_active_alarm, now)

View File

@ -0,0 +1,95 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from datetime import datetime
from oslo_log import log
from vitrage.common.constants import NotifierEventTypes
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.alarm_properties import AlarmProperties as AlarmProps
from vitrage.datasources.transformer_base import TIMESTAMP_FORMAT
from vitrage.machine_learning.plugins.base import MachineLearningBase
import vitrage.machine_learning.plugins.jaccard_correlation.\
accumulation_persistor_utils as APersistor
from vitrage.machine_learning.plugins.jaccard_correlation.\
alarm_data_accumulator import AlarmDataAccumulator as ADAcummulator
from vitrage.machine_learning.plugins.jaccard_correlation.correlation_manager\
import CorrelationManager as CM
LOG = log.getLogger(__name__)
AlarmID = namedtuple('AlarmID', [VProps.VITRAGE_RESOURCE_ID,
VProps.VITRAGE_RESOURCE_TYPE,
VProps.NAME])
class AlarmDataProcessor(MachineLearningBase):
@staticmethod
def get_plugin_name():
return 'jaccard_correlation'
def __init__(self, conf):
super(AlarmDataProcessor, self).__init__(conf)
self.data_manager = ADAcummulator(APersistor.load_data())
self.correlation_manager = CM(conf)
self.num_of_events_to_flush = \
conf.jaccard_correlation.num_of_events_to_flush
self.event_counter = 0
def process_event(self, data, event_type):
if event_type == NotifierEventTypes.ACTIVATE_ALARM_EVENT \
or event_type == NotifierEventTypes.DEACTIVATE_ALARM_EVENT:
# TODO(annarez): handle alarms from collectd
if data[VProps.VITRAGE_TYPE] == 'collectd':
return
self._update_data_accumulator(data)
self.event_counter += 1
# flush all data once num_of_events_to_flush is achieved
if self.event_counter == self.num_of_events_to_flush:
LOG.debug("Persisting: {}".format(str(data)))
self.data_manager.flush_accumulations()
APersistor.save_accumulated_data(self.data_manager)
self.correlation_manager.output_correlations(self.data_manager)
self.event_counter = 0
def _update_data_accumulator(self, data):
alarm_name = data[VProps.RAWTEXT] if data.get(VProps.RAWTEXT) else \
data[VProps.NAME]
alarm_id, timestamp = \
self._get_alarm_id_and_timestamp(data, alarm_name)
if data[VProps.STATE] == AlarmProps.ACTIVE_STATE:
self.data_manager.append_active(alarm_id, timestamp)
else:
self.data_manager.append_inactive(alarm_id, timestamp)
@staticmethod
def _get_alarm_id_and_timestamp(data, alarm_name):
alarm_id = AlarmID(data.get(VProps.VITRAGE_RESOURCE_ID),
data.get(VProps.VITRAGE_RESOURCE_TYPE),
alarm_name)
timestamp = datetime.strptime(data[VProps.UPDATE_TIMESTAMP],
TIMESTAMP_FORMAT)
return alarm_id, timestamp

View File

@ -0,0 +1,84 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
from itertools import groupby
from operator import itemgetter
from oslo_log import log
LOG = log.getLogger(__name__)
class CorrelationPriorities(object):
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
class AlarmsProperties(object):
ALARM1_RESOURCE_ID = 0
ALARM1_RESOURCE_TYPE = 1
ALARM1_NAME = 2
ALARM2_RESOURCE_ID = 3
ALARM2_RESOURCE_TYPE = 4
ALARM2_NAME = 5
OFFSET_DELTA = 6
CORR_SCORE = 7
class CorrelationCollection(object):
def __init__(self, high_corr_score, med_corr_score):
self.correlation_list = []
self.high_corr_score = high_corr_score
self.med_corr_score = med_corr_score
def set(self, alarm_1, alarm_2, offset_delta, correlation_score):
self.correlation_list.append(alarm_1 + alarm_2 +
(offset_delta, correlation_score))
def get_aggregated(self):
pairs_dict = defaultdict(list)
for alarm_pair in self.correlation_list:
if ((alarm_pair[AlarmsProperties.ALARM1_RESOURCE_TYPE],
alarm_pair[AlarmsProperties.ALARM1_NAME],
alarm_pair[AlarmsProperties.ALARM2_RESOURCE_TYPE],
alarm_pair[AlarmsProperties.ALARM2_NAME])) in pairs_dict:
pairs_dict[alarm_pair[AlarmsProperties.ALARM1_RESOURCE_TYPE],
alarm_pair[AlarmsProperties.ALARM1_NAME],
alarm_pair[AlarmsProperties.ALARM2_RESOURCE_TYPE],
alarm_pair[AlarmsProperties.ALARM2_NAME]]\
.append(alarm_pair[AlarmsProperties.CORR_SCORE])
else:
pairs_dict[alarm_pair[AlarmsProperties.ALARM2_RESOURCE_TYPE],
alarm_pair[AlarmsProperties.ALARM2_NAME],
alarm_pair[AlarmsProperties.ALARM1_RESOURCE_TYPE],
alarm_pair[AlarmsProperties.ALARM1_NAME]].\
append(alarm_pair[AlarmsProperties.CORR_SCORE])
results = [(key, sum((pairs_dict[key])) / float(len(pairs_dict[key])))
for key in pairs_dict]
categorize = lambda x: CorrelationPriorities.HIGH \
if x[1] >= self.high_corr_score \
else CorrelationPriorities.MEDIUM \
if x[1] >= self.med_corr_score else CorrelationPriorities.LOW
return [(key, [(x, y) for x, y in group]) for
key, group in groupby(sorted(results, key=itemgetter(1)),
key=categorize)]

View File

@ -0,0 +1,114 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
from oslo_log import log
from vitrage.machine_learning.plugins.jaccard_correlation.\
correlation_collection import CorrelationCollection as CCollection
from vitrage.machine_learning.plugins.jaccard_correlation.\
correlation_collection import CorrelationPriorities as CPriorities
LOG = log.getLogger(__name__)
class CorrelationManager(object):
def __init__(self, conf):
self.high_corr_score = conf.jaccard_correlation.high_corr_score
self.med_corr_score = conf.jaccard_correlation.med_corr_score
self.correlation_threshold = \
conf.jaccard_correlation.correlation_threshold
self.output_folder = conf.jaccard_correlation.output_folder
self.last_written_file = ""
self.correlation_table = CCollection(self.high_corr_score,
self.med_corr_score)
def output_correlations(self, accumulated_data):
now = int(time.time())
report = self._generate_report(accumulated_data)
self._dump_correlations(str(now) + "_correlations.out", dict(report))
@staticmethod
def _jaccard_score(alarm_1, alarm_2, accumulated_data):
key = frozenset([alarm_1, alarm_2])
intersect = accumulated_data.alarms_intersects.get(key)
if not intersect:
return 0
a1_time = accumulated_data.alarms_activity.get(alarm_1)
a2_time = accumulated_data.alarms_activity.get(alarm_2)
if not a1_time or not a2_time:
LOG.error("One of the alarms given has never been active")
return 0
combined = a1_time + a2_time - intersect
# jaccard is intersection / union of alarm times
return intersect.total_seconds() / combined.total_seconds()
def _generate_report(self, accumulated_data):
for alarm_1, alarm_2 in accumulated_data.alarms_intersects.keys():
jacc_score = \
self._jaccard_score(alarm_1, alarm_2, accumulated_data)
if jacc_score >= self.correlation_threshold:
self.correlation_table.set(alarm_1, alarm_2, 0, jacc_score)
# mean correlations divided to HIGH, MEDIUM and LOW correlation scores
report = self.correlation_table.get_aggregated()
return report
def _dump_correlations(self, output_path, alarms):
new_file_name = "{}/{}".format(self.output_folder, output_path)
try:
with open(new_file_name, 'w') as f:
LOG.info("Correlation manager wrote a new file: {}/{}".format
(self.output_folder, output_path))
for correlation_level, correlation_bar \
in [(CPriorities.HIGH,
" (> {})".format(self.high_corr_score)),
(CPriorities.MEDIUM,
" (> {})".format(self.med_corr_score)),
(CPriorities.LOW,
"(< {})".format(self.med_corr_score))]:
if correlation_level in alarms:
title = correlation_level + \
" correlation" + correlation_bar + ":"
f.write("\n" + title + "\n" +
("-" * len(title)) + "\n")
for alarm in alarms[correlation_level]:
f.write("alarm " + alarm[0][1] + " on " +
alarm[0][0] + " <-> alarm " +
alarm[0][3] + " on " + alarm[0][2] +
" with score " + str(alarm[1]) + "\n")
except Exception as e:
LOG.exception('Cannot save correlations - %s', e)
if os.path.isfile(self.last_written_file):
os.remove(self.last_written_file)
self.last_written_file = new_file_name

View File

@ -0,0 +1,85 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import oslo_messaging as oslo_m
from oslo_service import service as os_service
from oslo_utils import importutils
from vitrage import messaging
from vitrage.opts import register_opts
LOG = log.getLogger(__name__)
class MachineLearningService(os_service.Service):
def __init__(self, conf):
super(MachineLearningService, self).__init__()
self.conf = conf
self.machine_learning_plugins = self.get_machine_learning_plugins(conf)
transport = messaging.get_transport(conf)
target = \
oslo_m.Target(topic=conf.machine_learning.machine_learning_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[VitrageEventEndpoint(self.machine_learning_plugins)])
def start(self):
LOG.info("Vitrage Machine Learning Service - Starting...")
super(MachineLearningService, self).start()
self.listener.start()
LOG.info("Vitrage Machine Learning Service - Started!")
def stop(self, graceful=False):
LOG.info("Vitrage Machine Learning Service - Stopping...")
self.listener.stop()
self.listener.wait()
super(MachineLearningService, self).stop(graceful)
LOG.info("Vitrage Machine Learning Service - Stopped!")
@staticmethod
def get_machine_learning_plugins(conf):
machine_learning_plugins = []
machine_learning_plugins_names = \
conf.machine_learning.plugins
if not machine_learning_plugins_names:
LOG.info('There are no Machine Learning plugins in configuration')
return []
for machine_learning_plugin_name in machine_learning_plugins_names:
register_opts(conf, machine_learning_plugin_name,
conf.machine_learning.plugins_path)
LOG.info('Machine Learning plugin %s started',
machine_learning_plugin_name)
machine_learning_plugins.append(importutils.import_object(
conf[machine_learning_plugin_name].plugin_path,
conf))
return machine_learning_plugins
class VitrageEventEndpoint(object):
def __init__(self, machine_learning_plugins):
self.machine_learning_plugins = machine_learning_plugins
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""Endpoint for alarm notifications"""
LOG.info('Vitrage Event Info: event_type %s', event_type)
LOG.info('Vitrage Event Info: payload %s', payload)
for plugin in self.machine_learning_plugins:
plugin.process_event(payload, event_type)

View File

@ -23,6 +23,8 @@ import vitrage.datasources
import vitrage.entity_graph.consistency
import vitrage.evaluator
import vitrage.keystone_client
import vitrage.machine_learning
import vitrage.machine_learning.plugins.jaccard_correlation
import vitrage.notifier
import vitrage.notifier.plugins.snmp
import vitrage.os_clients
@ -44,6 +46,10 @@ def list_opts():
('consistency', vitrage.entity_graph.consistency.OPTS),
('entity_graph', vitrage.entity_graph.OPTS),
('service_credentials', vitrage.keystone_client.OPTS),
('machine_learning',
vitrage.machine_learning.OPTS),
('jaccard_correlation',
vitrage.machine_learning.plugins.jaccard_correlation.OPTS),
('snmp', vitrage.notifier.plugins.snmp.OPTS),
('DEFAULT', itertools.chain(
vitrage.os_clients.OPTS,

View File

@ -0,0 +1,15 @@
# Copyright 2017 - Alcatel-Lucent
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'stack'

View File

@ -0,0 +1,15 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = 'stack'

View File

@ -0,0 +1,386 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import os.path
from oslo_config import cfg
import time
from vitrage.common.constants import EntityCategory
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.aodh import AODH_DATASOURCE
from vitrage.datasources.transformer_base import TIMESTAMP_FORMAT
from vitrage.datasources.zabbix import ZABBIX_DATASOURCE
from vitrage.evaluator.actions import evaluator_event_transformer as evaluator
from vitrage.graph import Vertex
from vitrage.machine_learning.plugins.jaccard_correlation.\
accumulation_persistor_utils import AccumulatedData as AData
from vitrage.machine_learning.plugins.jaccard_correlation.\
alarm_data_accumulator import AlarmDataAccumulator as ADAccumulator
from vitrage.machine_learning.plugins.jaccard_correlation.\
alarm_processor import AlarmDataProcessor as ADProcessor
from vitrage.machine_learning.plugins.jaccard_correlation.\
alarm_processor import AlarmID
from vitrage.machine_learning.plugins.jaccard_correlation.\
correlation_collection import CorrelationCollection as CCollection
from vitrage.machine_learning.plugins.jaccard_correlation.correlation_manager \
import CorrelationManager as CManager
from vitrage.machine_learning.plugins.jaccard_correlation.\
correlation_collection import CorrelationPriorities as CPriorities
from vitrage.tests import base
ACTIVE_TIMESTAMP = datetime.datetime.utcnow()
ACTIVE_TIMESTAMP = ACTIVE_TIMESTAMP.strftime(TIMESTAMP_FORMAT)
INACTIVE_TIMESTAMP = \
datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
INACTIVE_TIMESTAMP = INACTIVE_TIMESTAMP.strftime(TIMESTAMP_FORMAT)
DEDUCED_ALARM_1 = Vertex('111', {
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: evaluator.VITRAGE_DATASOURCE,
VProps.VITRAGE_IS_DELETED: False,
VProps.VITRAGE_IS_PLACEHOLDER: False,
VProps.NAME: 'deduced_alarm_1',
VProps.UPDATE_TIMESTAMP: ACTIVE_TIMESTAMP,
VProps.VITRAGE_RESOURCE_ID: '111',
VProps.VITRAGE_RESOURCE_TYPE: 'resource_111',
})
AODH_ALARM_1 = Vertex('11', {
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: AODH_DATASOURCE,
VProps.VITRAGE_IS_DELETED: False,
VProps.VITRAGE_IS_PLACEHOLDER: False,
VProps.NAME: 'aodh_alarm_1',
VProps.UPDATE_TIMESTAMP: ACTIVE_TIMESTAMP,
VProps.VITRAGE_RESOURCE_ID: '11',
VProps.VITRAGE_RESOURCE_TYPE: 'resource_11',
})
ZABBIX_ALARM_1 = Vertex('1111', {
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: ZABBIX_DATASOURCE,
VProps.VITRAGE_IS_DELETED: False,
VProps.VITRAGE_IS_PLACEHOLDER: True,
VProps.NAME: 'zabbix_alarm_1 {}',
VProps.RAWTEXT: 'zabbix_alarm_1',
VProps.UPDATE_TIMESTAMP: ACTIVE_TIMESTAMP,
VProps.VITRAGE_RESOURCE_ID: '1111',
VProps.VITRAGE_RESOURCE_TYPE: 'resource_1111',
})
ZABBIX_ALARM_2 = Vertex('2222', {
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: ZABBIX_DATASOURCE,
VProps.VITRAGE_IS_DELETED: False,
VProps.VITRAGE_IS_PLACEHOLDER: True,
VProps.NAME: 'zabbix_alarm_2 {}',
VProps.RAWTEXT: 'zabbix_alarm_2',
VProps.UPDATE_TIMESTAMP: ACTIVE_TIMESTAMP,
VProps.VITRAGE_RESOURCE_ID: '2222',
VProps.VITRAGE_RESOURCE_TYPE: 'resource_2222',
})
DELETED_DEDUCED_ALARM_1 = Vertex('111', {
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: evaluator.VITRAGE_DATASOURCE,
VProps.VITRAGE_IS_DELETED: True,
VProps.VITRAGE_IS_PLACEHOLDER: False,
VProps.NAME: 'deduced_alarm_1',
VProps.UPDATE_TIMESTAMP: INACTIVE_TIMESTAMP,
VProps.VITRAGE_RESOURCE_ID: '111',
VProps.VITRAGE_RESOURCE_TYPE: 'resource_111',
})
DELETED_AODH_ALARM_1 = Vertex('11', {
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: evaluator.VITRAGE_DATASOURCE,
VProps.VITRAGE_IS_DELETED: True,
VProps.VITRAGE_IS_PLACEHOLDER: False,
VProps.NAME: 'aodh_alarm_1',
VProps.UPDATE_TIMESTAMP: INACTIVE_TIMESTAMP,
VProps.VITRAGE_RESOURCE_ID: '11',
VProps.VITRAGE_RESOURCE_TYPE: 'resource_11',
})
DELETED_ZABBIX_ALARM_1 = Vertex('1111', {
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: ZABBIX_DATASOURCE,
VProps.VITRAGE_IS_DELETED: True,
VProps.VITRAGE_IS_PLACEHOLDER: False,
VProps.NAME: 'zabbix_alarm_1 {}',
VProps.RAWTEXT: 'zabbix_alarm_1',
VProps.UPDATE_TIMESTAMP: INACTIVE_TIMESTAMP,
VProps.VITRAGE_RESOURCE_ID: '1111',
VProps.VITRAGE_RESOURCE_TYPE: 'resource_1111',
})
DELETED_ZABBIX_ALARM_2 = Vertex('2222', {
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
VProps.VITRAGE_TYPE: ZABBIX_DATASOURCE,
VProps.VITRAGE_IS_DELETED: True,
VProps.VITRAGE_IS_PLACEHOLDER: False,
VProps.NAME: 'zabbix_alarm_2 {}',
VProps.RAWTEXT: 'zabbix_alarm_2',
VProps.UPDATE_TIMESTAMP: INACTIVE_TIMESTAMP,
VProps.VITRAGE_RESOURCE_ID: '2222',
VProps.VITRAGE_RESOURCE_TYPE: 'resource_2222',
})
ACTIVE_ALARMS = [DEDUCED_ALARM_1, AODH_ALARM_1,
ZABBIX_ALARM_1, ZABBIX_ALARM_2]
INACTIVE_ALARMS = [DELETED_DEDUCED_ALARM_1, DELETED_AODH_ALARM_1,
DELETED_ZABBIX_ALARM_1, DELETED_ZABBIX_ALARM_2]
class JaccardCorrelationTest(base.BaseTest):
OPTS = [
cfg.StrOpt('output_folder', default='/tmp',
help='folder to write all reports to'),
cfg.FloatOpt('correlation_threshold', default=0,
help='threshold of interesting correlations'),
cfg.FloatOpt('high_corr_score', default=0.9,
help='high correlation lower limit'),
cfg.FloatOpt('med_corr_score', default=0.5,
help='medium correlation lower limit'),
]
@classmethod
def setUpClass(cls):
cls.conf = cfg.ConfigOpts()
cls.conf.register_opts(cls.OPTS, group='jaccard_correlation')
cls.data_manager = ADAccumulator(AData({}, {}))
cls.collection = \
CCollection(cls.conf.jaccard_correlation.high_corr_score,
cls.conf.jaccard_correlation.med_corr_score)
cls.correlation_manager = CManager(cls.conf)
cls.activate_timestamps = {}
cls.inactivate_timestamps = {}
cls.alarm_ids = cls._setup_expected_active_alarms_ids()
@staticmethod
def _setup_expected_active_alarms_ids():
alarm_ids = []
for alarm in ACTIVE_ALARMS:
alarm_name = alarm[VProps.RAWTEXT] if alarm.get(VProps.RAWTEXT) \
else alarm[VProps.NAME]
alarm_id = AlarmID(alarm.get(VProps.VITRAGE_RESOURCE_ID),
alarm.get(VProps.VITRAGE_RESOURCE_TYPE),
alarm_name)
alarm_ids.append(alarm_id)
return alarm_ids
def test_jaccard_correlation(self):
self._test_alarm_data_accumulations()
self._test_correlation_collection()
self._test_correlation_manager()
def _test_alarm_data_accumulations(self):
self._test_append_active()
self._test_flush_accumulations()
self._test_append_inactive()
def _test_append_active(self):
expected_active_start_dict = {}
real_alarm_ids = []
for alarm in ACTIVE_ALARMS:
alarm_name = alarm[VProps.RAWTEXT] if alarm.get(VProps.RAWTEXT) \
else alarm[VProps.NAME]
alarm_id, timestamp = ADProcessor.\
_get_alarm_id_and_timestamp(alarm, alarm_name)
self.activate_timestamps[alarm_id] = timestamp
expected_active_start_dict[alarm_id] = \
datetime.datetime.strptime(alarm.get(VProps.UPDATE_TIMESTAMP),
TIMESTAMP_FORMAT)
real_alarm_ids.append(alarm_id)
self.data_manager.append_active(alarm_id, timestamp)
# assert all alarm ids are right
for i in range(len(self.alarm_ids)):
self.assertEqual(self.alarm_ids[i], real_alarm_ids[i], '')
self.assertEqual(expected_active_start_dict,
self.data_manager.active_start_times)
self.assertEqual({}, self.data_manager.alarms_activity)
self.assertEqual({}, self.data_manager.alarms_intersects)
def _test_flush_accumulations(self):
prev_active_start_dict = self.data_manager.active_start_times
time.sleep(2)
self.data_manager.flush_accumulations()
self.assertEqual(prev_active_start_dict,
self.data_manager.active_start_times)
expected_activity_dict_len = len(ACTIVE_ALARMS)
self.assertEqual(expected_activity_dict_len,
len(self.data_manager.alarms_activity))
# choose 2
expected_intersections_dict_len = \
(len(ACTIVE_ALARMS) * (len(ACTIVE_ALARMS) - 1)) / 2
self.assertEqual(expected_intersections_dict_len,
len(self.data_manager.alarms_intersects))
def _test_append_inactive(self):
deleted_alarm_ids = []
for alarm in INACTIVE_ALARMS:
alarm_name = alarm[VProps.RAWTEXT] if alarm.get(VProps.RAWTEXT) \
else alarm[VProps.NAME]
alarm_id, timestamp = ADProcessor.\
_get_alarm_id_and_timestamp(alarm, alarm_name)
expected_alarm_id = \
AlarmID(alarm.get(VProps.VITRAGE_RESOURCE_ID),
alarm.get(VProps.VITRAGE_RESOURCE_TYPE),
alarm_name)
self.assertEqual(expected_alarm_id, alarm_id, '')
self.inactivate_timestamps[alarm_id] = timestamp
deleted_alarm_ids.append(alarm_id)
self.data_manager.append_inactive(alarm_id, timestamp)
# assert all deleted alarms has same alarm ids as activated alarms
self.assertEqual(self.alarm_ids, deleted_alarm_ids)
# all alarm are inactive at this moment
expected_active_start_dict = {}
self.assertEqual(expected_active_start_dict,
self.data_manager.active_start_times)
expected_activity_dict = {}
for alarm_id in self.alarm_ids:
expected_activity_dict[alarm_id] = \
self.inactivate_timestamps[alarm_id]\
- self.activate_timestamps[alarm_id]
self.assertEqual(expected_activity_dict,
self.data_manager.alarms_activity)
expected_intersections_dict = {}
# all alarms started and ended at the same time,
# intersection time equals activity time
for alarm_id1 in self.alarm_ids:
for alarm_id2 in self.alarm_ids:
# exclude intersection of alarm with itself
if alarm_id1 != alarm_id2:
key = frozenset([alarm_id1, alarm_id2])
expected_intersections_dict[key] = \
self.inactivate_timestamps[alarm_id]\
- self.activate_timestamps[alarm_id]
self.assertEqual(expected_intersections_dict,
self.data_manager.alarms_intersects)
def _test_correlation_collection(self):
self._test_correlation_list()
self._test_correlations_aggregation()
self.collection = CCollection(0.9, 0.5)
def _test_correlation_list(self):
offset_delta = 0
high_correlation = 0.9
med_correlation = 0.7
low_correlation = 0.4
correlations = [high_correlation, med_correlation, low_correlation]
alarms_pairs = []
cnt = 0
seen_pairs = []
for alarm1 in self.alarm_ids:
for alarm2 in self.alarm_ids:
if alarm1 != alarm2 \
and frozenset([alarm1, alarm2]) not in seen_pairs:
seen_pairs.append(frozenset([alarm1, alarm2]))
correlation = correlations[cnt % 3]
alarms_pairs.append((alarm1 + alarm2 +
(offset_delta, correlation)))
self.collection.set(alarm1, alarm2, offset_delta,
correlation)
cnt += 1
self.assertEqual(alarms_pairs, self.collection.correlation_list)
def _test_correlations_aggregation(self):
aggregated = self.collection.get_aggregated()
cnt_high_correlations = 0
cnt_med_correlations = 0
cnt_low_correlations = 0
for correlation_level in aggregated:
if correlation_level[0] == CPriorities.HIGH:
cnt_high_correlations = len(correlation_level[1])
if correlation_level[0] == CPriorities.MEDIUM:
cnt_med_correlations = len(correlation_level[1])
if correlation_level[0] == CPriorities.LOW:
cnt_low_correlations = len(correlation_level[1])
self.assertEqual(cnt_high_correlations, 2, '')
self.assertEqual(cnt_med_correlations, 2, '')
self.assertEqual(cnt_low_correlations, 2, '')
def _test_correlation_manager(self):
report = []
self._test_generate_report(report)
self._test_dump_correlations(report)
def _test_generate_report(self, report):
self.data_manager.flush_accumulations()
report.extend(self.correlation_manager.
_generate_report(self.data_manager))
# all intersects correlations are 1
self.assertEqual(CPriorities.HIGH, report[0][0])
self.assertEqual(len(self.data_manager.alarms_intersects),
len(report[0][1]))
def _test_dump_correlations(self, report):
now = int(time.time())
self.correlation_manager.\
_dump_correlations(str(now) + "_correlations_test.out",
report)
file_path = self.conf.jaccard_correlation.output_folder + '/' + \
str(now) + "_correlations_test.out"
is_file = os.path.isfile(file_path)
self.assertEqual(is_file, True)
if os.path.isfile(file_path):
os.remove(file_path)