add collectd datasource

Implements: blueprint collectd-datasource

Change-Id: I4fc357f241a096419f2b3444e046ae7dd1f11e56
This commit is contained in:
Eyal 2016-12-21 11:47:47 +02:00
parent ca6ecd606e
commit edc28629ca
20 changed files with 857 additions and 3 deletions

View File

@ -0,0 +1,17 @@
category: ALARM
values:
- aggregated values:
priority: 40
original values:
- name: FAILURE
operational_value: CRITICAL
- aggregated values:
priority: 30
original values:
- name: WARNING
operational_value: WARNING
- aggregated values:
priority: 10
original values:
- name: OK
operational_value: OK

View File

@ -0,0 +1,40 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from vitrage.common.constants import UpdateMethod
COLLECTD_DATASOURCE = 'collectd'
OPTS = [
cfg.StrOpt('transformer',
default='vitrage.datasources.collectd.transformer.'
'CollectdTransformer',
help='Collectd transformer class path',
required=True),
cfg.StrOpt('driver',
default='vitrage.datasources.collectd.driver.CollectdDriver',
help='Collectd driver class path',
required=True),
cfg.StrOpt('update_method',
default=UpdateMethod.PUSH,
help='None: updates only via Vitrage periodic snapshots.'
'Pull: updates every [changes_interval] seconds.'
'Push: updates by getting notifications from the'
' datasource itself.',
required=True),
cfg.StrOpt('config_file', default='/etc/vitrage/collectd_conf.yaml',
help='Collectd configuration file'),
]

View File

@ -0,0 +1,24 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collectd
import signal
def init():
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
collectd.register_init(init)

View File

@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
#
# © 2013 Lyft, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common code for collectd python plugins.
"""
import collectd
class CollectDPlugin(object):
"""Base class for collectd plugins written in Python.
Each plugin must have a unique name which must match the name
used for the configuration block in the collectd configuration file.
"""
def __init__(self, name=None):
self.name = name
if name:
collectd.register_config(self.configure, name=self.name)
else:
collectd.register_config(self.configure)
collectd.register_init(self.initialize)
collectd.register_shutdown(self.shutdown)
@staticmethod
def config_to_dict(config):
"""Convert a collectd.Config object to a dictionary. """
def config_to_tuple(config):
"""Convert a collectd.Config object to a tuple. """
if config.children:
return (config.key, dict(config_to_tuple(child)
for child in config.children))
elif len(config.values) > 1:
return config.key, config.values
else:
return config.key, config.values[0]
return dict([config_to_tuple(config)])
def error(self, message):
"""Log an error message to the collectd logger. """
collectd.error('%s plugin: %s' % (self.name, message))
def warning(self, message):
"""Log an warning message to the collectd logger. """
collectd.warning('%s plugin: %s' % (self.name, message))
def notice(self, message):
"""Log an notice message to the collectd logger. """
collectd.notice('%s plugin: %s' % (self.name, message))
def info(self, message):
"""Log an info message to the collectd logger. """
collectd.info('%s plugin: %s' % (self.name, message))
def debug(self, message):
"""Log an debug message to the collectd logger. """
collectd.debug('%s plugin: %s' % (self.name, message))
def configure(self, config, **kwargs):
"""Configuration callback for the plugin.
will be called by collectd with a collectd.
Config object containing configuration data for this plugin from the
collectd configuration file.
"""
# The top level of the configuration is the 'Module' block, which
# is entirely useless, so we set the config attribute to its value,
# which should be the interesting stuff.
self.config = CollectDPlugin.config_to_dict(config)['Module']
def initialize(self):
"""Initialization callback for the plugin.
will be called by collectd with no arguments.
"""
pass
def shutdown(self):
"""Shutdown callback for the plugin.
will be called by collectd with no arguments.
"""
pass
def add_read_callback(self, callback, **kwargs):
"""Register a read callback with collectd.
kwargs will be passed to collectd.register_read.
The callback will be called by collectd without arguments.
"""
collectd.register_read(callback, **kwargs)
def add_write_callback(self, callback, **kwargs):
"""Register a write callback with collectd.
kwargs will be passed to collectd.register_read.
The callback will be called by collectd with a collectd.
Values object as the only argument.
"""
collectd.register_write(callback)
def add_flush_callback(self, callback, **kwargs):
"""Register a flush callback with collectd.
kwargs will be passed to collectd.register_flush.
The callback will be called by collectd with two arguments,
a timeout and an identifier.
"""
collectd.register_flush(callback, **kwargs)
def add_log_callback(self, callback, **kwargs):
"""Register a log callback with collectd.
kwargs will be passed to collectd.register_log.
The callback will be called by collectd with two arguments,
the severity and the message (without a newline at the end)
"""
collectd.register_log(callback, **kwargs)
def add_notification_callback(self, callback, **kwargs):
"""Register a notification callback with collectd.
kwargs will be passed to collectd.register_notification.
The callback will be called by collectd with a collectd.
Notification object as the only argument.
"""
collectd.register_notification(callback, **kwargs)
class PluginError(StandardError):
pass

View File

@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
#
# © 2016 Nokia Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Collectd plugin for sending notifications to vitrage
"""
import collectd
import hashlib
import six
from vitrage.datasources.collectd.collectd_vitrage.plugin import CollectDPlugin
from vitrage.datasources.collectd.collectd_vitrage.plugin import PluginError
import uuid
from datetime import datetime
from oslo_config import cfg
import oslo_messaging as messaging
class VitrageNotifier(CollectDPlugin):
"""Collectd plugin for sending notifications to Vitrage. """
def configure(self, config, **kwargs):
super(VitrageNotifier, self).configure(config, **kwargs)
# to be filled later
for key in ('transport_url',):
if key not in self.config.keys():
message = 'Required configuration key %s missing!' % key
self.error(message)
raise PluginError(message)
def initialize(self):
"""Set up the Vitrage API client and add the notification callback. """
url = self.config['transport_url']
transport = messaging.get_transport(cfg.CONF, url)
self.notifier = messaging.Notifier(transport,
driver='messagingv2',
publisher_id='collectd',
topic='vitrage_notifications')
self.add_notification_callback(self.notify)
def notify(self, notification):
"""Send the notification to Vitrage. """
# Use a friendly string instead of a number.
severity = {
collectd.NOTIF_FAILURE: 'FAILURE',
collectd.NOTIF_WARNING: 'WARNING',
collectd.NOTIF_OKAY: 'OK',
}.get(notification.severity)
alarm = notification.host + notification.plugin_instance \
+ notification.type_instance
alarm_uuid = hashlib.md5(six.b(alarm)).hexdigest()
details = {
'host': notification.host,
'plugin': notification.plugin,
'plugin_instance': notification.plugin_instance,
'type': notification.type,
'type_instance': notification.type_instance,
'message': notification.message,
'severity': severity,
'time': notification.time,
'id': alarm_uuid
}
notification_id = str(uuid.uuid4())
self.notifier.info(ctxt={'message_id': notification_id,
'publisher_id': 'collectd',
'timestamp': datetime.utcnow()},
event_type='collectd.alarm.' + severity.lower(),
payload=details)
self.info('notification id %r to vitrage: %r' % (notification_id,
details))
# We have to call the constructor in order to actually register our plugin
# with collectd.
VITRAGE = VitrageNotifier()

View File

@ -0,0 +1,93 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.datasources.alarm_driver_base import AlarmDriverBase
from vitrage.datasources.collectd import COLLECTD_DATASOURCE
from vitrage.datasources.collectd.properties\
import CollectdProperties as CProps
from vitrage.utils import file as file_utils
LOG = log.getLogger(__name__)
class CollectdDriver(AlarmDriverBase):
conf_map = None
def __init__(self, conf):
super(CollectdDriver, self).__init__()
self.conf = conf
if not CollectdDriver.conf_map:
CollectdDriver.conf_map = \
CollectdDriver._configuration_mapping(conf)
def _entity_type(self):
return COLLECTD_DATASOURCE
def _alarm_key(self, alarm):
return alarm['id']
def _get_alarms(self):
return []
def _is_erroneous(self, alarm):
return alarm and alarm[CProps.SEVERITY] != 'OK'
def _status_changed(self, new_alarm, old_alarm):
return new_alarm and old_alarm \
and not new_alarm[CProps.SEVERITY] == old_alarm[CProps.SEVERITY]
def _is_valid(self, alarm):
return alarm[CProps.RESOURCE_TYPE] is not None \
and alarm[CProps.RESOURCE_NAME] is not None
@staticmethod
def _configuration_mapping(conf):
try:
collectd_config_file = conf.collectd['config_file']
collectd_config = file_utils.load_yaml_file(collectd_config_file)
collectd_config_elements = collectd_config['collectd']
mappings = {}
for element_config in collectd_config_elements:
mappings[element_config['collectd_host']] = {
CProps.RESOURCE_TYPE: element_config['type'],
CProps.RESOURCE_NAME: element_config['name']
}
return mappings
except Exception as e:
LOG.exception('failed in init %s ', e)
return {}
def enrich_event(self, event, event_type):
event[DSProps.EVENT_TYPE] = event_type
if CollectdDriver.conf_map:
collectd_host = event['host']
v_resource = CollectdDriver.conf_map[collectd_host]
event[CProps.RESOURCE_NAME] = v_resource[CProps.RESOURCE_NAME]
event[CProps.RESOURCE_TYPE] = v_resource[CProps.RESOURCE_TYPE]
return CollectdDriver.make_pickleable([event], COLLECTD_DATASOURCE,
DatasourceAction.UPDATE)[0]
@staticmethod
def get_event_types():
return ['collectd.alarm.ok', 'collectd.alarm.failure',
'collectd.alarm.warning']

View File

@ -0,0 +1,19 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class CollectdProperties(object):
RESOURCE_TYPE = 'resource_type'
RESOURCE_NAME = 'resource_name'
SEVERITY = 'severity'

View File

@ -0,0 +1,103 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.common.constants import EdgeLabel
from vitrage.common.constants import EntityCategory
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.alarm_properties import AlarmProperties as AlarmProps
from vitrage.datasources.alarm_transformer_base import AlarmTransformerBase
from vitrage.datasources.collectd import COLLECTD_DATASOURCE
from vitrage.datasources.collectd.properties import\
CollectdProperties as CProps
from vitrage.datasources import transformer_base as tbase
import vitrage.graph.utils as graph_utils
from vitrage.utils.datetime import format_unix_timestamp
LOG = logging.getLogger(__name__)
class CollectdTransformer(AlarmTransformerBase):
def __init__(self, transformers, conf):
super(CollectdTransformer, self).__init__(transformers, conf)
def _create_snapshot_entity_vertex(self, entity_event):
# The Collectd datasource does not support snapshot mode
return None
def _create_update_entity_vertex(self, entity_event):
return self._create_vertex(entity_event)
def _create_vertex(self, entity_event):
entity_event['timestamp'] = format_unix_timestamp(
entity_event['time'], tbase.TIMESTAMP_FORMAT)
update_timestamp = entity_event['timestamp']
sample_timestamp = entity_event[DSProps.SAMPLE_DATE]
entity_state = AlarmProps.INACTIVE_STATE \
if self._ok_status(entity_event) else AlarmProps.ACTIVE_STATE
metadata = {
VProps.NAME: entity_event['message'],
VProps.SEVERITY: entity_event[CProps.SEVERITY],
}
return graph_utils.create_vertex(
self._create_entity_key(entity_event),
entity_category=EntityCategory.ALARM,
entity_type=entity_event[DSProps.ENTITY_TYPE],
entity_state=entity_state,
sample_timestamp=sample_timestamp,
update_timestamp=update_timestamp,
metadata=metadata)
def _create_snapshot_neighbors(self, entity_event):
return self._create_collectd_neighbors(entity_event)
def _create_update_neighbors(self, entity_event):
return self._create_collectd_neighbors(entity_event)
def _create_collectd_neighbors(self, entity_event):
resource_type = entity_event[CProps.RESOURCE_TYPE]
if resource_type:
return [self._create_neighbor(
entity_event,
entity_event[CProps.RESOURCE_NAME],
resource_type,
EdgeLabel.ON,
neighbor_category=EntityCategory.RESOURCE)]
return []
def _ok_status(self, entity_event):
return entity_event[CProps.SEVERITY] == 'OK'
def _create_entity_key(self, entity_event):
entity_type = entity_event[DSProps.ENTITY_TYPE]
alarm_id = entity_event['id']
resource_name = entity_event[CProps.RESOURCE_NAME]
return tbase.build_key(self._key_values(entity_type,
resource_name,
alarm_id))
def get_type(self):
return COLLECTD_DATASOURCE

View File

@ -458,6 +458,27 @@ def simple_doctor_alarm_generators(update_vals=None):
return tg.get_trace_generators(test_entity_spec_list)
def simple_collectd_alarm_generators(update_vals=None):
"""A function for returning Collectd alarm event generators.
Returns generators for a given number of Collectd alarms.
:param update_vals: preset values for ALL update events
:return: generators for alarms as specified
"""
test_entity_spec_list = [({
tg.DYNAMIC_INFO_FKEY: tg.DRIVER_COLLECTD_UPDATE_D,
tg.STATIC_INFO_FKEY: None,
tg.EXTERNAL_INFO_KEY: update_vals,
tg.MAPPING_KEY: None,
tg.NAME_KEY: 'Collectd alarm generator',
tg.NUM_EVENTS: 1
})]
return tg.get_trace_generators(test_entity_spec_list)
def simple_aodh_alarm_notification_generators(alarm_num,
update_events=0,
update_vals=None):

View File

@ -210,3 +210,25 @@ def simple_doctor_alarm_generators(update_vals=None):
})]
return tg.get_trace_generators(test_entity_spec_list)
def simple_collectd_alarm_generators(update_vals=None):
"""A function for returning Collectd alarm event generators.
Returns generators for a given number of Collectd alarms.
:param update_vals: preset values for ALL update events
:return: generators for alarms as specified
"""
test_entity_spec_list = [({
tg.DYNAMIC_INFO_FKEY: tg.TRANS_COLLECTD_UPDATE_D,
tg.DYNAMIC_INFO_FPATH: tg.MOCK_TRANSFORMER_PATH,
tg.STATIC_INFO_FKEY: None,
tg.EXTERNAL_INFO_KEY: update_vals,
tg.MAPPING_KEY: None,
tg.NAME_KEY: 'Collectd alarm generator',
tg.NUM_EVENTS: 1
})]
return tg.get_trace_generators(test_entity_spec_list)

View File

@ -46,6 +46,7 @@ MOCK_DRIVER_PATH = '%s/mock_configurations/driver' % \
utils.get_resources_dir()
DRIVER_AODH_UPDATE_D = 'driver_aodh_update_dynamic.json'
DRIVER_DOCTOR_UPDATE_D = 'driver_doctor_update_dynamic.json'
DRIVER_COLLECTD_UPDATE_D = 'driver_collectd_update_dynamic.json'
DRIVER_HOST_SNAPSHOT_D = 'driver_host_snapshot_dynamic.json'
DRIVER_INST_SNAPSHOT_D = 'driver_inst_snapshot_dynamic.json'
DRIVER_INST_SNAPSHOT_S = 'driver_inst_snapshot_static.json'
@ -68,6 +69,7 @@ MOCK_TRANSFORMER_PATH = '%s/mock_configurations/transformer' % \
TRANS_AODH_SNAPSHOT_D = 'transformer_aodh_snapshot_dynamic.json'
TRANS_AODH_UPDATE_D = 'transformer_aodh_update_dynamic.json'
TRANS_DOCTOR_UPDATE_D = 'transformer_doctor_update_dynamic.json'
TRANS_COLLECTD_UPDATE_D = 'transformer_collectd_update_dynamic.json'
TRANS_INST_SNAPSHOT_D = 'transformer_inst_snapshot_dynamic.json'
TRANS_INST_SNAPSHOT_S = 'transformer_inst_snapshot_static.json'
TRANS_HOST_SNAPSHOT_D = 'transformer_host_snapshot_dynamic.json'
@ -111,6 +113,7 @@ class EventTraceGenerator(object):
static_info_parsers = \
{DRIVER_AODH_UPDATE_D: _get_aodh_alarm_update_driver_values,
DRIVER_DOCTOR_UPDATE_D: _get_doctor_update_driver_values,
DRIVER_COLLECTD_UPDATE_D: _get_collectd_update_driver_values,
DRIVER_INST_SNAPSHOT_D: _get_vm_snapshot_driver_values,
DRIVER_INST_UPDATE_D: _get_vm_update_driver_values,
DRIVER_HOST_SNAPSHOT_D: _get_host_snapshot_driver_values,
@ -128,6 +131,7 @@ class EventTraceGenerator(object):
TRANS_AODH_SNAPSHOT_D: _get_trans_aodh_alarm_snapshot_values,
TRANS_AODH_UPDATE_D: _get_trans_aodh_alarm_snapshot_values,
TRANS_DOCTOR_UPDATE_D: _get_trans_doctor_alarm_update_values,
TRANS_COLLECTD_UPDATE_D: _get_trans_collectd_alarm_update_values,
TRANS_INST_SNAPSHOT_D: _get_trans_vm_snapshot_values,
TRANS_HOST_SNAPSHOT_D: _get_trans_host_snapshot_values,
TRANS_ZONE_SNAPSHOT_D: _get_trans_zone_snapshot_values}
@ -258,6 +262,17 @@ def _get_doctor_update_driver_values(spec):
return [combine_data(None, None, spec.get(EXTERNAL_INFO_KEY, None))]
def _get_collectd_update_driver_values(spec):
"""Generates the static driver values for Collectd monitor notification.
:param spec: specification of event generation.
:type spec: dict
:return: list of notifications of Doctor monitor
:rtype: list
"""
return [combine_data(None, None, spec.get(EXTERNAL_INFO_KEY, None))]
def _get_zone_snapshot_driver_values(spec):
"""Generates the static driver values for each zone.
@ -653,6 +668,23 @@ def _get_trans_doctor_alarm_update_values(spec):
None, spec.get(EXTERNAL_INFO_KEY, None))]
def _get_trans_collectd_alarm_update_values(spec):
"""Generates the dynamic transformer values for a Collectd alarm
:param spec: specification of event generation.
:type spec: dict
:return: list of dynamic transformer values for a Collectd alarm
:rtype: list with one alarm
"""
static_info_re = None
if spec[STATIC_INFO_FKEY] is not None:
static_info_re = utils.load_specs(spec[STATIC_INFO_FKEY])
return [combine_data(static_info_re,
None, spec.get(EXTERNAL_INFO_KEY, None))]
def combine_data(static_info_re, mapping_info, external_info):
if external_info:
mapping_info = utils.merge_vals(mapping_info, external_info)

View File

@ -0,0 +1,11 @@
{
"host": "compute-1",
"plugin": "ovs_events",
"plugin_instance": "br-ex",
"type":"gauge",
"type_instance": "link_status",
"message": "link state of \"br-ex\" interface has been changed to \"WARNING\"",
"severity": "WARNING",
"time": "1482409029.062524",
"id": "46c7eba7753efb0e6f6a8de24c949c52"
}

View File

@ -0,0 +1,15 @@
{
"host": "compute-1",
"plugin": "ovs_events",
"vitrage_entity_type" : "collectd",
"vitrage_datasource_action" : "update",
"resource_type": "nova.host",
"resource_name": "compute-1",
"plugin_instance": "br-ex",
"type":"gauge",
"type_instance": "link_status",
"message": "link state of \"br-ex\" interface has been changed to \"WARNING\"",
"severity": "WARNING",
"time": "1482409029.062524",
"id": "46c7eba7753efb0e6f6a8de24c949c52"
}

View File

@ -0,0 +1,87 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from oslo_config import cfg
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.datasources.collectd.driver import CollectdDriver
from vitrage.datasources.collectd.properties \
import CollectdProperties as CProps
from vitrage.tests import base
from vitrage.tests.mocks import mock_driver
# noinspection PyProtectedMember
WARN_SEVERITY = 'warning'
WARNING_EVENT_TYPE = 'collectd.alarm.warning'
HOST = 'compute-1'
class TestCollectdDriver(base.BaseTest):
OPTS = []
# noinspection PyPep8Naming
@classmethod
def setUpClass(cls):
cls.conf = cfg.ConfigOpts()
cls.conf.register_opts(cls.OPTS, group='collectd')
# noinspection PyAttributeOutsideInit
def setUp(self):
super(TestCollectdDriver, self).setUp()
self.driver = CollectdDriver(self.conf)
def test_enrich_event_with_alarm_up(self):
now = datetime.now().isoformat()
event = self._enrich_event(now, HOST,
WARN_SEVERITY,
WARNING_EVENT_TYPE)
self._assert_event_equal(event, WARNING_EVENT_TYPE,
HOST, WARN_SEVERITY, now)
def _enrich_event(self, time_now, hostname, severity, event_type):
event = self._generate_event(time_now, hostname, severity)
event = self.driver.enrich_event(event, event_type)
return event
@staticmethod
def _generate_event(time, hostname, severity):
update_vals = {}
if hostname:
update_vals['host'] = hostname
if severity:
update_vals[CProps.SEVERITY] = severity
if time:
update_vals['time'] = time
generators = mock_driver.simple_doctor_alarm_generators(
update_vals=update_vals)
return mock_driver.generate_sequential_events_list(generators)[0]
def _assert_event_equal(self,
event,
expected_event_type,
expected_hostname,
expected_severity,
expected_sample_date):
self.assertIsNotNone(event, 'No event returned')
self.assertEqual(expected_hostname, event['host'])
self.assertEqual(expected_severity, event[CProps.SEVERITY])
self.assertEqual(expected_sample_date, event['time'])
self.assertEqual(expected_event_type, event[DSProps.EVENT_TYPE])

View File

@ -0,0 +1,111 @@
# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_config import cfg
from oslo_log import log as logging
from vitrage.common.constants import UpdateMethod
from vitrage.datasources.collectd import COLLECTD_DATASOURCE
from vitrage.datasources.collectd.transformer import CollectdTransformer
from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
from vitrage.datasources.nova.host.transformer import HostTransformer
from vitrage.tests.mocks import mock_transformer
from vitrage.tests.unit.datasources.test_alarm_transformer_base import \
BaseAlarmTransformerTest
from vitrage.utils.datetime import format_unix_timestamp
LOG = logging.getLogger(__name__)
# noinspection PyProtectedMember
class TestCollectdTransformer(BaseAlarmTransformerTest):
OPTS = [
cfg.StrOpt('update_method',
default=UpdateMethod.PUSH),
]
# noinspection PyAttributeOutsideInit,PyPep8Naming
@classmethod
def setUpClass(cls):
cls.transformers = {}
cls.conf = cfg.ConfigOpts()
cls.conf.register_opts(cls.OPTS, group=COLLECTD_DATASOURCE)
cls.conf.register_opts(cls.OPTS, group=NOVA_HOST_DATASOURCE)
cls.transformers[COLLECTD_DATASOURCE] = \
CollectdTransformer(cls.transformers, cls.conf)
cls.transformers[NOVA_HOST_DATASOURCE] = \
HostTransformer(cls.transformers, cls.conf)
def test_create_update_entity_vertex(self):
# Test setup
time1 = time.time()
host1 = 'compute-1'
event = self._generate_event(time1, host1, 'WARNING')
self.assertIsNotNone(event)
# Test action
transformer = self.transformers[COLLECTD_DATASOURCE]
wrapper = transformer.transform(event)
# Test assertions
self._validate_vertex_props(wrapper.vertex, event)
# Validate the neighbors: only one valid host neighbor
self._validate_host_neighbor(wrapper,
transformer._create_entity_key(event),
host1)
# Validate the expected action on the graph - update or delete
self._validate_graph_action(wrapper)
# Create an event with status 'UP'
time2 = time.time()
host2 = 'compute-2'
event = self._generate_event(time2, host2, 'OK')
self.assertIsNotNone(event)
# Test action
transformer = self.transformers[COLLECTD_DATASOURCE]
wrapper = transformer.transform(event)
# Test assertions
self._validate_vertex_props(wrapper.vertex, event)
self._validate_host_neighbor(wrapper,
transformer._create_entity_key(event),
host2)
self._validate_graph_action(wrapper)
def _validate_vertex_props(self, vertex, event):
timestamp = format_unix_timestamp(event['time'])
self._validate_alarm_vertex_props(vertex,
event['message'],
COLLECTD_DATASOURCE,
timestamp)
@staticmethod
def _generate_event(time, hostname, severity):
update_vals = {'host': hostname, 'severity': severity, 'time': time,
'vitrage_sample_date': format_unix_timestamp(time),
'resource_name': hostname}
generators = mock_transformer.simple_collectd_alarm_generators(
update_vals=update_vals)
return mock_transformer.generate_random_events_list(generators)[0]
def _is_erroneous(self, vertex):
return vertex['severity'] != 'OK'

View File

@ -59,10 +59,11 @@ class BaseAlarmTransformerTest(BaseTransformerTest):
properties = {
VProps.ID: host_name,
VProps.TYPE: NOVA_HOST_DATASOURCE,
VProps.CATEGORY: EntityCategory.RESOURCE,
VProps.SAMPLE_TIMESTAMP: wrapper.vertex[VProps.SAMPLE_TIMESTAMP],
}
expected_neighbor = host_transformer.\
create_placeholder_vertex(**properties)
expected_neighbor = host_transformer. \
create_neighbor_placeholder_vertex(**properties)
self.assertEqual(expected_neighbor, host_neighbor.vertex)

View File

@ -18,6 +18,9 @@ from datetime import datetime
from oslo_utils import timeutils
TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
def utcnow(with_timezone=True):
"""Better version of utcnow() that returns utcnow with a correct TZ."""
return timeutils.utcnow(with_timezone)
@ -28,6 +31,6 @@ def change_time_str_format(timestamp_str, old_format, new_format):
return utc.strftime(new_format)
def format_unix_timestamp(timestamp, date_format):
def format_unix_timestamp(timestamp, date_format=TIMESTAMP_FORMAT):
return datetime.fromtimestamp(float(timestamp)) \
.strftime(date_format)