diff --git a/devstack/plugin.sh b/devstack/plugin.sh index eb2c13d0d..525bc4928 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -291,6 +291,7 @@ function start_vitrage { run_process vitrage-collector "$VITRAGE_BIN_DIR/vitrage-collector --config-file $VITRAGE_CONF" run_process vitrage-graph "$VITRAGE_BIN_DIR/vitrage-graph --config-file $VITRAGE_CONF" run_process vitrage-notifier "$VITRAGE_BIN_DIR/vitrage-notifier --config-file $VITRAGE_CONF" + run_process vitrage-ml "$VITRAGE_BIN_DIR/vitrage-ml --config-file $VITRAGE_CONF" } # stop_vitrage() - Stop running processes diff --git a/devstack/settings b/devstack/settings index 41f29e513..e368511f9 100644 --- a/devstack/settings +++ b/devstack/settings @@ -7,6 +7,8 @@ enable_service vitrage-graph enable_service vitrage-notifier # Collector enable_service vitrage-collector +# machine_learning +enable_service vitrage-ml # Default directories diff --git a/releasenotes/notes/machine_learning_service-da9700e6c6fa61b6.yaml b/releasenotes/notes/machine_learning_service-da9700e6c6fa61b6.yaml new file mode 100644 index 000000000..c6f0c84fd --- /dev/null +++ b/releasenotes/notes/machine_learning_service-da9700e6c6fa61b6.yaml @@ -0,0 +1,8 @@ +--- +features: + - A new service was added to Vitrage, the machine learning service. + Together with it, the first machine learning plugin was added - the + Jaccard Correlation plugin. This plugin listens to rabbit MQ + receiving messages about creation and deletion of alarms, and + learns the correlation between each pair of alarms, in order to + recommend on creation of new templates in the future. diff --git a/setup.cfg b/setup.cfg index dd09701b7..cf0dcc62d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,7 @@ console_scripts = vitrage-graph = vitrage.cli.graph:main vitrage-notifier = vitrage.cli.notifier:main vitrage-collector = vitrage.cli.collector:main + vitrage-ml = vitrage.cli.machine_learning:main vitrage.entity_graph = networkx = vitrage.graph.driver.networkx_graph:NXGraph diff --git a/vitrage/cli/machine_learning.py b/vitrage/cli/machine_learning.py new file mode 100644 index 000000000..7ab16706a --- /dev/null +++ b/vitrage/cli/machine_learning.py @@ -0,0 +1,30 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_service import service as os_service +import sys + +from vitrage.machine_learning.service import MachineLearningService +from vitrage import service + + +def main(): + conf = service.prepare_service() + launcher = os_service.ServiceLauncher(conf) + launcher.launch_service(MachineLearningService(conf)) + launcher.wait() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/vitrage/common/constants.py b/vitrage/common/constants.py index 435808445..8506dee0b 100644 --- a/vitrage/common/constants.py +++ b/vitrage/common/constants.py @@ -112,6 +112,8 @@ class GraphAction(object): class NotifierEventTypes(object): ACTIVATE_DEDUCED_ALARM_EVENT = 'vitrage.deduced_alarm.activate' DEACTIVATE_DEDUCED_ALARM_EVENT = 'vitrage.deduced_alarm.deactivate' + ACTIVATE_ALARM_EVENT = 'vitrage.alarm.activate' + DEACTIVATE_ALARM_EVENT = 'vitrage.alarm.deactivate' ACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.activate' DEACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.deactivate' EXECUTE_EXTERNAL_ACTION = 'vitrage.execute_external_action' diff --git a/vitrage/entity_graph/processor/notifier.py b/vitrage/entity_graph/processor/notifier.py index a343f1a56..919795c25 100644 --- a/vitrage/entity_graph/processor/notifier.py +++ b/vitrage/entity_graph/processor/notifier.py @@ -28,25 +28,42 @@ class GraphNotifier(object): """Allows writing to message bus""" def __init__(self, conf): self.oslo_notifier = None - try: - topic = conf.entity_graph.notifier_topic - notifier_plugins = conf.notifiers - if not topic or not notifier_plugins: - LOG.info('Graph Notifier is disabled') - return - - self.oslo_notifier = oslo_messaging.Notifier( - get_transport(conf), - driver='messagingv2', - publisher_id='vitrage.graph', - topics=[topic]) - except Exception as e: - LOG.info('Graph Notifier - missing configuration %s' % str(e)) + topics = self._get_topics(conf) + if not topics: + LOG.info('Graph Notifier is disabled') + return + self.oslo_notifier = oslo_messaging.Notifier( + get_transport(conf), + driver='messagingv2', + publisher_id='vitrage.graph', + topics=topics) @property def enabled(self): return self.oslo_notifier is not None + def _get_topics(self, conf): + topics = [] + + try: + notifier_topic = conf.entity_graph.notifier_topic + notifier_plugins = conf.notifiers + if notifier_topic and notifier_plugins: + topics.append(notifier_topic) + except Exception as e: + LOG.info('Graph Notifier - missing configuration %s' % str(e)) + + try: + machine_learning_topic = \ + conf.machine_learning.machine_learning_topic + machine_learning_plugins = conf.machine_learning.plugins + if machine_learning_topic and machine_learning_plugins: + topics.append(machine_learning_topic) + except Exception as e: + LOG.info('Machine Learning - missing configuration %s' % str(e)) + + return topics + def notify_when_applicable(self, before, current, is_vertex, graph): """Callback subscribed to driver.graph updates @@ -64,9 +81,9 @@ class GraphNotifier(object): # in case the vertex point to some resource add the resource to the # notification (useful for deduce alarm notifications) - if current.get(VProps.RESOURCE_ID): + if current.get(VProps.VITRAGE_RESOURCE_ID): current.properties[VProps.RESOURCE] = graph.get_vertex( - current.get(VProps.RESOURCE_ID)) + current.get(VProps.VITRAGE_RESOURCE_ID)) LOG.info('notification_types : %s', str(notification_types)) LOG.info('notification properties : %s', current.properties) @@ -99,6 +116,9 @@ def _get_notification_type(before, current, is_vertex): notification_type(_is_active_deduced_alarm, NotifierEventTypes.ACTIVATE_DEDUCED_ALARM_EVENT, NotifierEventTypes.DEACTIVATE_DEDUCED_ALARM_EVENT), + notification_type(_is_active_alarm, + NotifierEventTypes.ACTIVATE_ALARM_EVENT, + NotifierEventTypes.DEACTIVATE_ALARM_EVENT), notification_type(_is_marked_down, NotifierEventTypes.ACTIVATE_MARK_DOWN_EVENT, NotifierEventTypes.DEACTIVATE_MARK_DOWN_EVENT), @@ -115,6 +135,12 @@ def _is_active_deduced_alarm(vertex): return False +def _is_active_alarm(vertex): + if vertex and vertex.get(VProps.VITRAGE_CATEGORY) == EntityCategory.ALARM: + return _is_relevant_vertex(vertex) + return False + + def _is_marked_down(vertex): if not vertex: return False diff --git a/vitrage/machine_learning/__init__.py b/vitrage/machine_learning/__init__.py new file mode 100644 index 000000000..8f2e2513b --- /dev/null +++ b/vitrage/machine_learning/__init__.py @@ -0,0 +1,28 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +OPTS = [ + cfg.ListOpt('plugins', + help='Names of enabled machine learning plugins ' + '(example jaccard_correlation)'), + cfg.ListOpt('plugins_path', + default=['vitrage.machine_learning.plugins'], + help='list of base path for notifiers'), + cfg.StrOpt('machine_learning_topic', + default='vitrage.machine_learning', + help='The topic that vitrage-graph uses for graph ' + 'machine learning messages.'), + ] diff --git a/vitrage/machine_learning/plugins/__init__.py b/vitrage/machine_learning/plugins/__init__.py new file mode 100644 index 000000000..d19d99459 --- /dev/null +++ b/vitrage/machine_learning/plugins/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +__author__ = 'stack' diff --git a/vitrage/machine_learning/plugins/base.py b/vitrage/machine_learning/plugins/base.py new file mode 100644 index 000000000..449855ee0 --- /dev/null +++ b/vitrage/machine_learning/plugins/base.py @@ -0,0 +1,32 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class MachineLearningBase(object): + + def __init__(self, conf): + self.conf = conf + + @abc.abstractmethod + def process_event(self, data, event_type): + pass + + @staticmethod + @abc.abstractmethod + def get_plugin_name(): + pass diff --git a/vitrage/machine_learning/plugins/jaccard_correlation/__init__.py b/vitrage/machine_learning/plugins/jaccard_correlation/__init__.py new file mode 100644 index 000000000..3f90bc8f3 --- /dev/null +++ b/vitrage/machine_learning/plugins/jaccard_correlation/__init__.py @@ -0,0 +1,34 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + +OPTS = [ + cfg.StrOpt('plugin_path', + default='vitrage.machine_learning.plugins.' + 'jaccard_correlation.alarm_processor.' + 'AlarmDataProcessor', + help='jaccard_correlation class path', + required=True), + cfg.IntOpt('num_of_events_to_flush', default=1000, + help='the amount of events flushes'), + cfg.StrOpt('output_folder', default='/tmp', + help='folder to write all reports to'), + cfg.FloatOpt('correlation_threshold', default=0, + help='threshold of interesting correlations'), + cfg.FloatOpt('high_corr_score', default=0.9, + help='high correlation lower limit'), + cfg.FloatOpt('med_corr_score', default=0.5, + help='medium correlation lower limit'), + ] diff --git a/vitrage/machine_learning/plugins/jaccard_correlation/accumulation_persistor_utils.py b/vitrage/machine_learning/plugins/jaccard_correlation/accumulation_persistor_utils.py new file mode 100644 index 000000000..87ba13140 --- /dev/null +++ b/vitrage/machine_learning/plugins/jaccard_correlation/accumulation_persistor_utils.py @@ -0,0 +1,59 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import namedtuple +from oslo_log import log +import pickle + +LOG = log.getLogger(__name__) + +AccumulatedData = namedtuple('AccumulatedData', ['activity', 'intersection']) + +ACTIVITY_PATH = "/tmp/alarms_activity.txt" +INTERSECT_PATH = "/tmp/alarms_intersections.txt" + + +def load_data(): + try: + with open(ACTIVITY_PATH, 'rb') as activity_f: + alarms_activity = pickle.load(activity_f) + except Exception as e: + LOG.info('Cannot load alarms_activity - %s', e) + return AccumulatedData({}, {}) + + try: + with open(INTERSECT_PATH, 'rb') as intersect_f: + alarms_intersect = pickle.load(intersect_f) + except Exception as e: + LOG.info('Cannot load alarms_intersect - %s', e) + return AccumulatedData({}, {}) + + return AccumulatedData(alarms_activity, alarms_intersect) + + +def save_accumulated_data(data_manager): + activity = data_manager.alarms_activity + intersects = data_manager.alarms_intersects + + try: + with open(ACTIVITY_PATH, 'wb') as activity_f: + pickle.dump(activity, activity_f) + except Exception as e: + LOG.exception('Cannot save alarms_intersect - %s', e) + + try: + with open(INTERSECT_PATH, 'wb') as intersect_f: + pickle.dump(intersects, intersect_f) + except Exception as e: + LOG.exception('Cannot save alarms_intersect - %s', e) diff --git a/vitrage/machine_learning/plugins/jaccard_correlation/alarm_data_accumulator.py b/vitrage/machine_learning/plugins/jaccard_correlation/alarm_data_accumulator.py new file mode 100644 index 000000000..03fc6cfd3 --- /dev/null +++ b/vitrage/machine_learning/plugins/jaccard_correlation/alarm_data_accumulator.py @@ -0,0 +1,80 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +from oslo_log import log + +LOG = log.getLogger(__name__) + + +class AlarmDataAccumulator(object): + + def __init__(self, accumulated_data): + self.active_start_times = {} + self.alarms_activity = accumulated_data.activity + # TODO(annarez): exclude intersections between deduced and it's cause + self.alarms_intersects = accumulated_data.intersection + + def append_active(self, alarm_id, timestamp): + + if alarm_id in self.active_start_times: + LOG.debug("Active alarm {} was started twice. Second time at {}". + format(alarm_id, str(timestamp))) + return + + self.active_start_times[alarm_id] = timestamp + + def append_inactive(self, alarm_id, end_time): + + if alarm_id not in self.active_start_times: + LOG.debug("Alarm {} at {} was deactivated without being active". + format(alarm_id, str(end_time))) + return + + alarm_duration = end_time - self.active_start_times[alarm_id] + self.alarms_activity[alarm_id] = alarm_duration + \ + self.alarms_activity.get(alarm_id, datetime.timedelta(0)) + + start_time = self.active_start_times[alarm_id] + + del self.active_start_times[alarm_id] + self.append_intersect(alarm_id, start_time, end_time) + + def append_intersect(self, alarm_id, start_time, end_time): + + for active_alarm in self.active_start_times.items(): + key = frozenset([alarm_id, active_alarm[0]]) + active_alarm_service = active_alarm[1] + self.alarms_intersects[key] = \ + self.alarms_intersects.get(key, datetime.timedelta(0)) + \ + self.calc_intersect(start_time, end_time, active_alarm_service) + + def calc_intersect(self, inactive_start, inactive_end, active_start): + return inactive_end - max(inactive_start, active_start) + + def flush_accumulations(self): + """flush all active alarms + + empty the data from active_start_times and re-enter all the + currently-active alarms, as if they started now + """ + + now = datetime.datetime.now() + active_alarms = list(self.active_start_times) + + for active_alarm in active_alarms: + self.append_inactive(active_alarm, now) + + for flushed_active_alarm in active_alarms: + self.append_active(flushed_active_alarm, now) diff --git a/vitrage/machine_learning/plugins/jaccard_correlation/alarm_processor.py b/vitrage/machine_learning/plugins/jaccard_correlation/alarm_processor.py new file mode 100644 index 000000000..abc295912 --- /dev/null +++ b/vitrage/machine_learning/plugins/jaccard_correlation/alarm_processor.py @@ -0,0 +1,95 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import namedtuple +from datetime import datetime +from oslo_log import log + +from vitrage.common.constants import NotifierEventTypes +from vitrage.common.constants import VertexProperties as VProps +from vitrage.datasources.alarm_properties import AlarmProperties as AlarmProps +from vitrage.datasources.transformer_base import TIMESTAMP_FORMAT +from vitrage.machine_learning.plugins.base import MachineLearningBase +import vitrage.machine_learning.plugins.jaccard_correlation.\ + accumulation_persistor_utils as APersistor +from vitrage.machine_learning.plugins.jaccard_correlation.\ + alarm_data_accumulator import AlarmDataAccumulator as ADAcummulator +from vitrage.machine_learning.plugins.jaccard_correlation.correlation_manager\ + import CorrelationManager as CM + +LOG = log.getLogger(__name__) + +AlarmID = namedtuple('AlarmID', [VProps.VITRAGE_RESOURCE_ID, + VProps.VITRAGE_RESOURCE_TYPE, + VProps.NAME]) + + +class AlarmDataProcessor(MachineLearningBase): + + @staticmethod + def get_plugin_name(): + return 'jaccard_correlation' + + def __init__(self, conf): + super(AlarmDataProcessor, self).__init__(conf) + self.data_manager = ADAcummulator(APersistor.load_data()) + self.correlation_manager = CM(conf) + self.num_of_events_to_flush = \ + conf.jaccard_correlation.num_of_events_to_flush + self.event_counter = 0 + + def process_event(self, data, event_type): + + if event_type == NotifierEventTypes.ACTIVATE_ALARM_EVENT \ + or event_type == NotifierEventTypes.DEACTIVATE_ALARM_EVENT: + + # TODO(annarez): handle alarms from collectd + if data[VProps.VITRAGE_TYPE] == 'collectd': + return + + self._update_data_accumulator(data) + self.event_counter += 1 + + # flush all data once num_of_events_to_flush is achieved + if self.event_counter == self.num_of_events_to_flush: + LOG.debug("Persisting: {}".format(str(data))) + self.data_manager.flush_accumulations() + APersistor.save_accumulated_data(self.data_manager) + self.correlation_manager.output_correlations(self.data_manager) + self.event_counter = 0 + + def _update_data_accumulator(self, data): + + alarm_name = data[VProps.RAWTEXT] if data.get(VProps.RAWTEXT) else \ + data[VProps.NAME] + + alarm_id, timestamp = \ + self._get_alarm_id_and_timestamp(data, alarm_name) + + if data[VProps.STATE] == AlarmProps.ACTIVE_STATE: + self.data_manager.append_active(alarm_id, timestamp) + else: + self.data_manager.append_inactive(alarm_id, timestamp) + + @staticmethod + def _get_alarm_id_and_timestamp(data, alarm_name): + + alarm_id = AlarmID(data.get(VProps.VITRAGE_RESOURCE_ID), + data.get(VProps.VITRAGE_RESOURCE_TYPE), + alarm_name) + + timestamp = datetime.strptime(data[VProps.UPDATE_TIMESTAMP], + TIMESTAMP_FORMAT) + + return alarm_id, timestamp diff --git a/vitrage/machine_learning/plugins/jaccard_correlation/correlation_collection.py b/vitrage/machine_learning/plugins/jaccard_correlation/correlation_collection.py new file mode 100644 index 000000000..696c90f35 --- /dev/null +++ b/vitrage/machine_learning/plugins/jaccard_correlation/correlation_collection.py @@ -0,0 +1,84 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import defaultdict +from itertools import groupby +from operator import itemgetter +from oslo_log import log + +LOG = log.getLogger(__name__) + + +class CorrelationPriorities(object): + HIGH = "HIGH" + MEDIUM = "MEDIUM" + LOW = "LOW" + + +class AlarmsProperties(object): + ALARM1_RESOURCE_ID = 0 + ALARM1_RESOURCE_TYPE = 1 + ALARM1_NAME = 2 + ALARM2_RESOURCE_ID = 3 + ALARM2_RESOURCE_TYPE = 4 + ALARM2_NAME = 5 + OFFSET_DELTA = 6 + CORR_SCORE = 7 + + +class CorrelationCollection(object): + + def __init__(self, high_corr_score, med_corr_score): + self.correlation_list = [] + self.high_corr_score = high_corr_score + self.med_corr_score = med_corr_score + + def set(self, alarm_1, alarm_2, offset_delta, correlation_score): + + self.correlation_list.append(alarm_1 + alarm_2 + + (offset_delta, correlation_score)) + + def get_aggregated(self): + + pairs_dict = defaultdict(list) + + for alarm_pair in self.correlation_list: + if ((alarm_pair[AlarmsProperties.ALARM1_RESOURCE_TYPE], + alarm_pair[AlarmsProperties.ALARM1_NAME], + alarm_pair[AlarmsProperties.ALARM2_RESOURCE_TYPE], + alarm_pair[AlarmsProperties.ALARM2_NAME])) in pairs_dict: + + pairs_dict[alarm_pair[AlarmsProperties.ALARM1_RESOURCE_TYPE], + alarm_pair[AlarmsProperties.ALARM1_NAME], + alarm_pair[AlarmsProperties.ALARM2_RESOURCE_TYPE], + alarm_pair[AlarmsProperties.ALARM2_NAME]]\ + .append(alarm_pair[AlarmsProperties.CORR_SCORE]) + else: + pairs_dict[alarm_pair[AlarmsProperties.ALARM2_RESOURCE_TYPE], + alarm_pair[AlarmsProperties.ALARM2_NAME], + alarm_pair[AlarmsProperties.ALARM1_RESOURCE_TYPE], + alarm_pair[AlarmsProperties.ALARM1_NAME]].\ + append(alarm_pair[AlarmsProperties.CORR_SCORE]) + + results = [(key, sum((pairs_dict[key])) / float(len(pairs_dict[key]))) + for key in pairs_dict] + + categorize = lambda x: CorrelationPriorities.HIGH \ + if x[1] >= self.high_corr_score \ + else CorrelationPriorities.MEDIUM \ + if x[1] >= self.med_corr_score else CorrelationPriorities.LOW + + return [(key, [(x, y) for x, y in group]) for + key, group in groupby(sorted(results, key=itemgetter(1)), + key=categorize)] diff --git a/vitrage/machine_learning/plugins/jaccard_correlation/correlation_manager.py b/vitrage/machine_learning/plugins/jaccard_correlation/correlation_manager.py new file mode 100644 index 000000000..2e7f87e9d --- /dev/null +++ b/vitrage/machine_learning/plugins/jaccard_correlation/correlation_manager.py @@ -0,0 +1,114 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import time + +from oslo_log import log + +from vitrage.machine_learning.plugins.jaccard_correlation.\ + correlation_collection import CorrelationCollection as CCollection +from vitrage.machine_learning.plugins.jaccard_correlation.\ + correlation_collection import CorrelationPriorities as CPriorities + + +LOG = log.getLogger(__name__) + + +class CorrelationManager(object): + + def __init__(self, conf): + self.high_corr_score = conf.jaccard_correlation.high_corr_score + self.med_corr_score = conf.jaccard_correlation.med_corr_score + self.correlation_threshold = \ + conf.jaccard_correlation.correlation_threshold + self.output_folder = conf.jaccard_correlation.output_folder + self.last_written_file = "" + self.correlation_table = CCollection(self.high_corr_score, + self.med_corr_score) + + def output_correlations(self, accumulated_data): + now = int(time.time()) + report = self._generate_report(accumulated_data) + + self._dump_correlations(str(now) + "_correlations.out", dict(report)) + + @staticmethod + def _jaccard_score(alarm_1, alarm_2, accumulated_data): + key = frozenset([alarm_1, alarm_2]) + intersect = accumulated_data.alarms_intersects.get(key) + + if not intersect: + return 0 + + a1_time = accumulated_data.alarms_activity.get(alarm_1) + a2_time = accumulated_data.alarms_activity.get(alarm_2) + if not a1_time or not a2_time: + LOG.error("One of the alarms given has never been active") + return 0 + + combined = a1_time + a2_time - intersect + # jaccard is intersection / union of alarm times + return intersect.total_seconds() / combined.total_seconds() + + def _generate_report(self, accumulated_data): + + for alarm_1, alarm_2 in accumulated_data.alarms_intersects.keys(): + + jacc_score = \ + self._jaccard_score(alarm_1, alarm_2, accumulated_data) + + if jacc_score >= self.correlation_threshold: + self.correlation_table.set(alarm_1, alarm_2, 0, jacc_score) + + # mean correlations divided to HIGH, MEDIUM and LOW correlation scores + report = self.correlation_table.get_aggregated() + return report + + def _dump_correlations(self, output_path, alarms): + + new_file_name = "{}/{}".format(self.output_folder, output_path) + + try: + with open(new_file_name, 'w') as f: + LOG.info("Correlation manager wrote a new file: {}/{}".format + (self.output_folder, output_path)) + + for correlation_level, correlation_bar \ + in [(CPriorities.HIGH, + " (> {})".format(self.high_corr_score)), + (CPriorities.MEDIUM, + " (> {})".format(self.med_corr_score)), + (CPriorities.LOW, + "(< {})".format(self.med_corr_score))]: + if correlation_level in alarms: + + title = correlation_level + \ + " correlation" + correlation_bar + ":" + + f.write("\n" + title + "\n" + + ("-" * len(title)) + "\n") + + for alarm in alarms[correlation_level]: + f.write("alarm " + alarm[0][1] + " on " + + alarm[0][0] + " <-> alarm " + + alarm[0][3] + " on " + alarm[0][2] + + " with score " + str(alarm[1]) + "\n") + except Exception as e: + LOG.exception('Cannot save correlations - %s', e) + + if os.path.isfile(self.last_written_file): + os.remove(self.last_written_file) + + self.last_written_file = new_file_name diff --git a/vitrage/machine_learning/service.py b/vitrage/machine_learning/service.py new file mode 100644 index 000000000..6f8819eb3 --- /dev/null +++ b/vitrage/machine_learning/service.py @@ -0,0 +1,85 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log +import oslo_messaging as oslo_m +from oslo_service import service as os_service +from oslo_utils import importutils + +from vitrage import messaging +from vitrage.opts import register_opts + +LOG = log.getLogger(__name__) + + +class MachineLearningService(os_service.Service): + + def __init__(self, conf): + super(MachineLearningService, self).__init__() + self.conf = conf + self.machine_learning_plugins = self.get_machine_learning_plugins(conf) + transport = messaging.get_transport(conf) + target = \ + oslo_m.Target(topic=conf.machine_learning.machine_learning_topic) + self.listener = messaging.get_notification_listener( + transport, [target], + [VitrageEventEndpoint(self.machine_learning_plugins)]) + + def start(self): + LOG.info("Vitrage Machine Learning Service - Starting...") + + super(MachineLearningService, self).start() + self.listener.start() + + LOG.info("Vitrage Machine Learning Service - Started!") + + def stop(self, graceful=False): + LOG.info("Vitrage Machine Learning Service - Stopping...") + + self.listener.stop() + self.listener.wait() + super(MachineLearningService, self).stop(graceful) + + LOG.info("Vitrage Machine Learning Service - Stopped!") + + @staticmethod + def get_machine_learning_plugins(conf): + machine_learning_plugins = [] + machine_learning_plugins_names = \ + conf.machine_learning.plugins + if not machine_learning_plugins_names: + LOG.info('There are no Machine Learning plugins in configuration') + return [] + for machine_learning_plugin_name in machine_learning_plugins_names: + register_opts(conf, machine_learning_plugin_name, + conf.machine_learning.plugins_path) + LOG.info('Machine Learning plugin %s started', + machine_learning_plugin_name) + machine_learning_plugins.append(importutils.import_object( + conf[machine_learning_plugin_name].plugin_path, + conf)) + return machine_learning_plugins + + +class VitrageEventEndpoint(object): + + def __init__(self, machine_learning_plugins): + self.machine_learning_plugins = machine_learning_plugins + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + """Endpoint for alarm notifications""" + LOG.info('Vitrage Event Info: event_type %s', event_type) + LOG.info('Vitrage Event Info: payload %s', payload) + for plugin in self.machine_learning_plugins: + plugin.process_event(payload, event_type) diff --git a/vitrage/opts.py b/vitrage/opts.py index 5a00c9198..ea1462064 100644 --- a/vitrage/opts.py +++ b/vitrage/opts.py @@ -23,6 +23,8 @@ import vitrage.datasources import vitrage.entity_graph.consistency import vitrage.evaluator import vitrage.keystone_client +import vitrage.machine_learning +import vitrage.machine_learning.plugins.jaccard_correlation import vitrage.notifier import vitrage.notifier.plugins.snmp import vitrage.os_clients @@ -44,6 +46,10 @@ def list_opts(): ('consistency', vitrage.entity_graph.consistency.OPTS), ('entity_graph', vitrage.entity_graph.OPTS), ('service_credentials', vitrage.keystone_client.OPTS), + ('machine_learning', + vitrage.machine_learning.OPTS), + ('jaccard_correlation', + vitrage.machine_learning.plugins.jaccard_correlation.OPTS), ('snmp', vitrage.notifier.plugins.snmp.OPTS), ('DEFAULT', itertools.chain( vitrage.os_clients.OPTS, diff --git a/vitrage/tests/unit/machine_learning/__init__.py b/vitrage/tests/unit/machine_learning/__init__.py new file mode 100644 index 000000000..8468f6f47 --- /dev/null +++ b/vitrage/tests/unit/machine_learning/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2017 - Alcatel-Lucent +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +__author__ = 'stack' diff --git a/vitrage/tests/unit/machine_learning/jaccard_correlation/__init__.py b/vitrage/tests/unit/machine_learning/jaccard_correlation/__init__.py new file mode 100644 index 000000000..d19d99459 --- /dev/null +++ b/vitrage/tests/unit/machine_learning/jaccard_correlation/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +__author__ = 'stack' diff --git a/vitrage/tests/unit/machine_learning/jaccard_correlation/test_jaccard_correlation.py b/vitrage/tests/unit/machine_learning/jaccard_correlation/test_jaccard_correlation.py new file mode 100644 index 000000000..e61a45c30 --- /dev/null +++ b/vitrage/tests/unit/machine_learning/jaccard_correlation/test_jaccard_correlation.py @@ -0,0 +1,386 @@ +# Copyright 2017 - Nokia +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import os.path +from oslo_config import cfg +import time + +from vitrage.common.constants import EntityCategory +from vitrage.common.constants import VertexProperties as VProps +from vitrage.datasources.aodh import AODH_DATASOURCE +from vitrage.datasources.transformer_base import TIMESTAMP_FORMAT +from vitrage.datasources.zabbix import ZABBIX_DATASOURCE +from vitrage.evaluator.actions import evaluator_event_transformer as evaluator +from vitrage.graph import Vertex +from vitrage.machine_learning.plugins.jaccard_correlation.\ + accumulation_persistor_utils import AccumulatedData as AData +from vitrage.machine_learning.plugins.jaccard_correlation.\ + alarm_data_accumulator import AlarmDataAccumulator as ADAccumulator +from vitrage.machine_learning.plugins.jaccard_correlation.\ + alarm_processor import AlarmDataProcessor as ADProcessor +from vitrage.machine_learning.plugins.jaccard_correlation.\ + alarm_processor import AlarmID +from vitrage.machine_learning.plugins.jaccard_correlation.\ + correlation_collection import CorrelationCollection as CCollection +from vitrage.machine_learning.plugins.jaccard_correlation.correlation_manager \ + import CorrelationManager as CManager +from vitrage.machine_learning.plugins.jaccard_correlation.\ + correlation_collection import CorrelationPriorities as CPriorities +from vitrage.tests import base + + +ACTIVE_TIMESTAMP = datetime.datetime.utcnow() +ACTIVE_TIMESTAMP = ACTIVE_TIMESTAMP.strftime(TIMESTAMP_FORMAT) +INACTIVE_TIMESTAMP = \ + datetime.datetime.utcnow() + datetime.timedelta(minutes=10) +INACTIVE_TIMESTAMP = INACTIVE_TIMESTAMP.strftime(TIMESTAMP_FORMAT) + +DEDUCED_ALARM_1 = Vertex('111', { + VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, + VProps.VITRAGE_TYPE: evaluator.VITRAGE_DATASOURCE, + VProps.VITRAGE_IS_DELETED: False, + VProps.VITRAGE_IS_PLACEHOLDER: False, + VProps.NAME: 'deduced_alarm_1', + VProps.UPDATE_TIMESTAMP: ACTIVE_TIMESTAMP, + VProps.VITRAGE_RESOURCE_ID: '111', + VProps.VITRAGE_RESOURCE_TYPE: 'resource_111', + }) + +AODH_ALARM_1 = Vertex('11', { + VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, + VProps.VITRAGE_TYPE: AODH_DATASOURCE, + VProps.VITRAGE_IS_DELETED: False, + VProps.VITRAGE_IS_PLACEHOLDER: False, + VProps.NAME: 'aodh_alarm_1', + VProps.UPDATE_TIMESTAMP: ACTIVE_TIMESTAMP, + VProps.VITRAGE_RESOURCE_ID: '11', + VProps.VITRAGE_RESOURCE_TYPE: 'resource_11', + }) + +ZABBIX_ALARM_1 = Vertex('1111', { + VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, + VProps.VITRAGE_TYPE: ZABBIX_DATASOURCE, + VProps.VITRAGE_IS_DELETED: False, + VProps.VITRAGE_IS_PLACEHOLDER: True, + VProps.NAME: 'zabbix_alarm_1 {}', + VProps.RAWTEXT: 'zabbix_alarm_1', + VProps.UPDATE_TIMESTAMP: ACTIVE_TIMESTAMP, + VProps.VITRAGE_RESOURCE_ID: '1111', + VProps.VITRAGE_RESOURCE_TYPE: 'resource_1111', + }) + +ZABBIX_ALARM_2 = Vertex('2222', { + VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, + VProps.VITRAGE_TYPE: ZABBIX_DATASOURCE, + VProps.VITRAGE_IS_DELETED: False, + VProps.VITRAGE_IS_PLACEHOLDER: True, + VProps.NAME: 'zabbix_alarm_2 {}', + VProps.RAWTEXT: 'zabbix_alarm_2', + VProps.UPDATE_TIMESTAMP: ACTIVE_TIMESTAMP, + VProps.VITRAGE_RESOURCE_ID: '2222', + VProps.VITRAGE_RESOURCE_TYPE: 'resource_2222', +}) + +DELETED_DEDUCED_ALARM_1 = Vertex('111', { + VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, + VProps.VITRAGE_TYPE: evaluator.VITRAGE_DATASOURCE, + VProps.VITRAGE_IS_DELETED: True, + VProps.VITRAGE_IS_PLACEHOLDER: False, + VProps.NAME: 'deduced_alarm_1', + VProps.UPDATE_TIMESTAMP: INACTIVE_TIMESTAMP, + VProps.VITRAGE_RESOURCE_ID: '111', + VProps.VITRAGE_RESOURCE_TYPE: 'resource_111', +}) + +DELETED_AODH_ALARM_1 = Vertex('11', { + VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, + VProps.VITRAGE_TYPE: evaluator.VITRAGE_DATASOURCE, + VProps.VITRAGE_IS_DELETED: True, + VProps.VITRAGE_IS_PLACEHOLDER: False, + VProps.NAME: 'aodh_alarm_1', + VProps.UPDATE_TIMESTAMP: INACTIVE_TIMESTAMP, + VProps.VITRAGE_RESOURCE_ID: '11', + VProps.VITRAGE_RESOURCE_TYPE: 'resource_11', +}) + +DELETED_ZABBIX_ALARM_1 = Vertex('1111', { + VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, + VProps.VITRAGE_TYPE: ZABBIX_DATASOURCE, + VProps.VITRAGE_IS_DELETED: True, + VProps.VITRAGE_IS_PLACEHOLDER: False, + VProps.NAME: 'zabbix_alarm_1 {}', + VProps.RAWTEXT: 'zabbix_alarm_1', + VProps.UPDATE_TIMESTAMP: INACTIVE_TIMESTAMP, + VProps.VITRAGE_RESOURCE_ID: '1111', + VProps.VITRAGE_RESOURCE_TYPE: 'resource_1111', +}) + +DELETED_ZABBIX_ALARM_2 = Vertex('2222', { + VProps.VITRAGE_CATEGORY: EntityCategory.ALARM, + VProps.VITRAGE_TYPE: ZABBIX_DATASOURCE, + VProps.VITRAGE_IS_DELETED: True, + VProps.VITRAGE_IS_PLACEHOLDER: False, + VProps.NAME: 'zabbix_alarm_2 {}', + VProps.RAWTEXT: 'zabbix_alarm_2', + VProps.UPDATE_TIMESTAMP: INACTIVE_TIMESTAMP, + VProps.VITRAGE_RESOURCE_ID: '2222', + VProps.VITRAGE_RESOURCE_TYPE: 'resource_2222', +}) + +ACTIVE_ALARMS = [DEDUCED_ALARM_1, AODH_ALARM_1, + ZABBIX_ALARM_1, ZABBIX_ALARM_2] + +INACTIVE_ALARMS = [DELETED_DEDUCED_ALARM_1, DELETED_AODH_ALARM_1, + DELETED_ZABBIX_ALARM_1, DELETED_ZABBIX_ALARM_2] + + +class JaccardCorrelationTest(base.BaseTest): + OPTS = [ + cfg.StrOpt('output_folder', default='/tmp', + help='folder to write all reports to'), + cfg.FloatOpt('correlation_threshold', default=0, + help='threshold of interesting correlations'), + cfg.FloatOpt('high_corr_score', default=0.9, + help='high correlation lower limit'), + cfg.FloatOpt('med_corr_score', default=0.5, + help='medium correlation lower limit'), + ] + + @classmethod + def setUpClass(cls): + + cls.conf = cfg.ConfigOpts() + cls.conf.register_opts(cls.OPTS, group='jaccard_correlation') + + cls.data_manager = ADAccumulator(AData({}, {})) + cls.collection = \ + CCollection(cls.conf.jaccard_correlation.high_corr_score, + cls.conf.jaccard_correlation.med_corr_score) + cls.correlation_manager = CManager(cls.conf) + cls.activate_timestamps = {} + cls.inactivate_timestamps = {} + cls.alarm_ids = cls._setup_expected_active_alarms_ids() + + @staticmethod + def _setup_expected_active_alarms_ids(): + alarm_ids = [] + for alarm in ACTIVE_ALARMS: + alarm_name = alarm[VProps.RAWTEXT] if alarm.get(VProps.RAWTEXT) \ + else alarm[VProps.NAME] + alarm_id = AlarmID(alarm.get(VProps.VITRAGE_RESOURCE_ID), + alarm.get(VProps.VITRAGE_RESOURCE_TYPE), + alarm_name) + alarm_ids.append(alarm_id) + + return alarm_ids + + def test_jaccard_correlation(self): + self._test_alarm_data_accumulations() + self._test_correlation_collection() + self._test_correlation_manager() + + def _test_alarm_data_accumulations(self): + self._test_append_active() + self._test_flush_accumulations() + self._test_append_inactive() + + def _test_append_active(self): + + expected_active_start_dict = {} + real_alarm_ids = [] + + for alarm in ACTIVE_ALARMS: + + alarm_name = alarm[VProps.RAWTEXT] if alarm.get(VProps.RAWTEXT) \ + else alarm[VProps.NAME] + + alarm_id, timestamp = ADProcessor.\ + _get_alarm_id_and_timestamp(alarm, alarm_name) + + self.activate_timestamps[alarm_id] = timestamp + expected_active_start_dict[alarm_id] = \ + datetime.datetime.strptime(alarm.get(VProps.UPDATE_TIMESTAMP), + TIMESTAMP_FORMAT) + + real_alarm_ids.append(alarm_id) + self.data_manager.append_active(alarm_id, timestamp) + + # assert all alarm ids are right + for i in range(len(self.alarm_ids)): + self.assertEqual(self.alarm_ids[i], real_alarm_ids[i], '') + + self.assertEqual(expected_active_start_dict, + self.data_manager.active_start_times) + + self.assertEqual({}, self.data_manager.alarms_activity) + self.assertEqual({}, self.data_manager.alarms_intersects) + + def _test_flush_accumulations(self): + + prev_active_start_dict = self.data_manager.active_start_times + + time.sleep(2) + self.data_manager.flush_accumulations() + + self.assertEqual(prev_active_start_dict, + self.data_manager.active_start_times) + + expected_activity_dict_len = len(ACTIVE_ALARMS) + self.assertEqual(expected_activity_dict_len, + len(self.data_manager.alarms_activity)) + + # choose 2 + expected_intersections_dict_len = \ + (len(ACTIVE_ALARMS) * (len(ACTIVE_ALARMS) - 1)) / 2 + self.assertEqual(expected_intersections_dict_len, + len(self.data_manager.alarms_intersects)) + + def _test_append_inactive(self): + deleted_alarm_ids = [] + + for alarm in INACTIVE_ALARMS: + alarm_name = alarm[VProps.RAWTEXT] if alarm.get(VProps.RAWTEXT) \ + else alarm[VProps.NAME] + + alarm_id, timestamp = ADProcessor.\ + _get_alarm_id_and_timestamp(alarm, alarm_name) + + expected_alarm_id = \ + AlarmID(alarm.get(VProps.VITRAGE_RESOURCE_ID), + alarm.get(VProps.VITRAGE_RESOURCE_TYPE), + alarm_name) + + self.assertEqual(expected_alarm_id, alarm_id, '') + + self.inactivate_timestamps[alarm_id] = timestamp + deleted_alarm_ids.append(alarm_id) + + self.data_manager.append_inactive(alarm_id, timestamp) + + # assert all deleted alarms has same alarm ids as activated alarms + self.assertEqual(self.alarm_ids, deleted_alarm_ids) + + # all alarm are inactive at this moment + expected_active_start_dict = {} + self.assertEqual(expected_active_start_dict, + self.data_manager.active_start_times) + + expected_activity_dict = {} + + for alarm_id in self.alarm_ids: + expected_activity_dict[alarm_id] = \ + self.inactivate_timestamps[alarm_id]\ + - self.activate_timestamps[alarm_id] + + self.assertEqual(expected_activity_dict, + self.data_manager.alarms_activity) + + expected_intersections_dict = {} + + # all alarms started and ended at the same time, + # intersection time equals activity time + for alarm_id1 in self.alarm_ids: + for alarm_id2 in self.alarm_ids: + # exclude intersection of alarm with itself + if alarm_id1 != alarm_id2: + key = frozenset([alarm_id1, alarm_id2]) + expected_intersections_dict[key] = \ + self.inactivate_timestamps[alarm_id]\ + - self.activate_timestamps[alarm_id] + + self.assertEqual(expected_intersections_dict, + self.data_manager.alarms_intersects) + + def _test_correlation_collection(self): + self._test_correlation_list() + self._test_correlations_aggregation() + self.collection = CCollection(0.9, 0.5) + + def _test_correlation_list(self): + offset_delta = 0 + high_correlation = 0.9 + med_correlation = 0.7 + low_correlation = 0.4 + + correlations = [high_correlation, med_correlation, low_correlation] + alarms_pairs = [] + cnt = 0 + + seen_pairs = [] + + for alarm1 in self.alarm_ids: + for alarm2 in self.alarm_ids: + if alarm1 != alarm2 \ + and frozenset([alarm1, alarm2]) not in seen_pairs: + seen_pairs.append(frozenset([alarm1, alarm2])) + correlation = correlations[cnt % 3] + alarms_pairs.append((alarm1 + alarm2 + + (offset_delta, correlation))) + + self.collection.set(alarm1, alarm2, offset_delta, + correlation) + cnt += 1 + + self.assertEqual(alarms_pairs, self.collection.correlation_list) + + def _test_correlations_aggregation(self): + + aggregated = self.collection.get_aggregated() + cnt_high_correlations = 0 + cnt_med_correlations = 0 + cnt_low_correlations = 0 + + for correlation_level in aggregated: + if correlation_level[0] == CPriorities.HIGH: + cnt_high_correlations = len(correlation_level[1]) + if correlation_level[0] == CPriorities.MEDIUM: + cnt_med_correlations = len(correlation_level[1]) + if correlation_level[0] == CPriorities.LOW: + cnt_low_correlations = len(correlation_level[1]) + + self.assertEqual(cnt_high_correlations, 2, '') + self.assertEqual(cnt_med_correlations, 2, '') + self.assertEqual(cnt_low_correlations, 2, '') + + def _test_correlation_manager(self): + report = [] + self._test_generate_report(report) + self._test_dump_correlations(report) + + def _test_generate_report(self, report): + self.data_manager.flush_accumulations() + report.extend(self.correlation_manager. + _generate_report(self.data_manager)) + + # all intersects correlations are 1 + self.assertEqual(CPriorities.HIGH, report[0][0]) + self.assertEqual(len(self.data_manager.alarms_intersects), + len(report[0][1])) + + def _test_dump_correlations(self, report): + now = int(time.time()) + + self.correlation_manager.\ + _dump_correlations(str(now) + "_correlations_test.out", + report) + + file_path = self.conf.jaccard_correlation.output_folder + '/' + \ + str(now) + "_correlations_test.out" + is_file = os.path.isfile(file_path) + + self.assertEqual(is_file, True) + + if os.path.isfile(file_path): + os.remove(file_path)