Created the collectd-aodh-plugin
- Provided code that creates the collectd-aodh-plugin. Aodh alarms are created based on collectd notifications. Event alarms are created. - Included a .conf file for this change. - Added changes to the base.py file for the meters to facilitate this change. - Included a reno for this change. - Included tests for this change. Change-Id: I7db01df69104aded199a8c43e8c7b433ff549763
This commit is contained in:
parent
457fb7417e
commit
1933ec8f29
0
collectd_ceilometer/aodh/__init__.py
Normal file
0
collectd_ceilometer/aodh/__init__.py
Normal file
56
collectd_ceilometer/aodh/notifier.py
Normal file
56
collectd_ceilometer/aodh/notifier.py
Normal file
@ -0,0 +1,56 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Aodh collectd plugin implementation."""
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from collectd_ceilometer.aodh.sender import Sender
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Notifier(object):
|
||||
"""Aodh notifier."""
|
||||
|
||||
def __init__(self, meters, config):
|
||||
"""Initialize Notifier."""
|
||||
self._meters = meters
|
||||
self._sender = Sender()
|
||||
self._config = config
|
||||
|
||||
def notify(self, vl, data):
|
||||
"""Collect data from collectd."""
|
||||
# take the plugin (specialized or default) for parsing the data
|
||||
notification = self._meters.get(vl.plugin)
|
||||
# prepare all data related to the sample
|
||||
metername = notification.meter_name(vl)
|
||||
message = notification.message(vl)
|
||||
severity = notification.severity(vl)
|
||||
resource_id = notification.resource_id(vl)
|
||||
timestamp = datetime.datetime.utcfromtimestamp(vl.time).isoformat()
|
||||
|
||||
LOGGER.debug(
|
||||
'Writing: plugin="%s", message="%s", severity="%s", time="%s',
|
||||
vl.plugin, message, severity, timestamp)
|
||||
|
||||
self._send_data(metername, severity, resource_id, message)
|
||||
|
||||
def _send_data(self, metername, severity, resource_id, message):
|
||||
"""Send data to Aodh."""
|
||||
LOGGER.debug('Sending alarm for %s', metername)
|
||||
self._sender.send(metername, severity, resource_id, message)
|
85
collectd_ceilometer/aodh/plugin.py
Normal file
85
collectd_ceilometer/aodh/plugin.py
Normal file
@ -0,0 +1,85 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Aodh collectd plugin."""
|
||||
|
||||
import logging
|
||||
|
||||
try:
|
||||
# pylint: disable=import-error
|
||||
import collectd
|
||||
# pylint: enable=import-error
|
||||
except ImportError:
|
||||
collectd = None # when running unit tests collectd is not avaliable
|
||||
|
||||
import collectd_ceilometer
|
||||
from collectd_ceilometer.aodh.notifier import Notifier
|
||||
from collectd_ceilometer.common.logger import CollectdLogHandler
|
||||
from collectd_ceilometer.common.meters import MeterStorage
|
||||
from collectd_ceilometer.common.settings import Config
|
||||
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
ROOT_LOGGER = logging.getLogger(collectd_ceilometer.__name__)
|
||||
|
||||
|
||||
def register_plugin(collectd):
|
||||
"""Bind plugin hooks to collectd and viceversa."""
|
||||
config = Config.instance()
|
||||
|
||||
# Setup loggging
|
||||
log_handler = CollectdLogHandler(collectd=collectd)
|
||||
log_handler.cfg = config
|
||||
ROOT_LOGGER.addHandler(log_handler)
|
||||
ROOT_LOGGER.setLevel(logging.NOTSET)
|
||||
|
||||
# Creates collectd plugin instance
|
||||
instance = Plugin(collectd=collectd, config=config)
|
||||
|
||||
# Register plugin callbacks
|
||||
collectd.register_config(instance.config)
|
||||
collectd.register_shutdown(instance.shutdown)
|
||||
collectd.register_notification(instance.notify)
|
||||
|
||||
|
||||
class Plugin(object):
|
||||
"""Aodh plugin with collectd callbacks."""
|
||||
|
||||
# NOTE: this is a multithreaded class
|
||||
|
||||
def __init__(self, collectd, config):
|
||||
"""Plugin instance."""
|
||||
self._config = config
|
||||
self._meters = MeterStorage(collectd=collectd)
|
||||
self._notifier = Notifier(self._meters, config=config)
|
||||
|
||||
def config(self, cfg):
|
||||
"""Configuration callback.
|
||||
|
||||
@param cfg configuration node provided by collectd
|
||||
"""
|
||||
self._config.read(cfg)
|
||||
|
||||
def notify(self, vl, data=None):
|
||||
"""Notification callback."""
|
||||
LOGGER.info("Notification")
|
||||
self._notifier.notify(vl, data)
|
||||
|
||||
def shutdown(self):
|
||||
"""Shutdown callback."""
|
||||
LOGGER.info("SHUTDOWN")
|
||||
|
||||
|
||||
if collectd:
|
||||
register_plugin(collectd=collectd)
|
311
collectd_ceilometer/aodh/sender.py
Normal file
311
collectd_ceilometer/aodh/sender.py
Normal file
@ -0,0 +1,311 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Aodh collectd plugin implementation."""
|
||||
|
||||
from __future__ import division
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import collectd_ceilometer
|
||||
from collectd_ceilometer.common.keystone_light import ClientV3
|
||||
from collectd_ceilometer.common.keystone_light import KeystoneException
|
||||
from collectd_ceilometer.common.settings import Config
|
||||
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
from requests.exceptions import RequestException
|
||||
import six
|
||||
import threading
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
ROOT_LOGGER = logging.getLogger(collectd_ceilometer.__name__)
|
||||
|
||||
# HTTP status codes
|
||||
HTTP_CREATED = 201
|
||||
HTTP_UNAUTHORIZED = 401
|
||||
HTTP_NOT_FOUND = 404
|
||||
|
||||
|
||||
class Sender(object):
|
||||
"""Sends the JSON serialized data to Aodh."""
|
||||
|
||||
def __init__(self):
|
||||
"""Create the Sender instance.
|
||||
|
||||
The cofiguration must be initialized before the object is created.
|
||||
"""
|
||||
self._url_base = None
|
||||
self._keystone = None
|
||||
self._auth_token = None
|
||||
self._auth_lock = threading.Lock()
|
||||
self._failed_auth = False
|
||||
self._alarm_ids = {}
|
||||
self._alarm_names = list()
|
||||
|
||||
def _authenticate(self):
|
||||
"""Authenticate and renew the authentication token."""
|
||||
# if auth_token is available, just return it
|
||||
if self._auth_token is not None:
|
||||
return self._auth_token
|
||||
|
||||
# aquire the authentication lock
|
||||
with self._auth_lock:
|
||||
# re-check the auth_token as another thread could set it
|
||||
if self._auth_token is not None:
|
||||
return self._auth_token
|
||||
|
||||
LOGGER.debug('Authenticating request')
|
||||
# pylint: disable=broad-except
|
||||
try:
|
||||
# create a keystone client if it doesn't exist
|
||||
if self._keystone is None:
|
||||
cfg = Config.instance()
|
||||
self._keystone = ClientV3(
|
||||
auth_url=cfg.OS_AUTH_URL,
|
||||
username=cfg.OS_USERNAME,
|
||||
password=cfg.OS_PASSWORD,
|
||||
tenant_name=cfg.OS_TENANT_NAME
|
||||
)
|
||||
# store the authentication token
|
||||
self._auth_token = self._keystone.auth_token
|
||||
|
||||
# get the uri of service endpoint
|
||||
endpoint = self._get_endpoint("aodh")
|
||||
|
||||
self._url_base = "{}/v2/alarms/%s/state".format(endpoint)
|
||||
|
||||
LOGGER.info('Authenticating request - success')
|
||||
self._failed_auth = False
|
||||
|
||||
except KeystoneException as exc:
|
||||
log_level = logging.DEBUG
|
||||
|
||||
if not self._failed_auth:
|
||||
|
||||
log_level = logging.ERROR
|
||||
LOGGER.error(
|
||||
'Suspending error logs until successful auth'
|
||||
)
|
||||
|
||||
LOGGER.log(log_level, 'Authentication error: %s',
|
||||
six.text_type(exc),
|
||||
exc_info=0)
|
||||
|
||||
if exc.response:
|
||||
LOGGER.debug('Response: %s', exc.response)
|
||||
|
||||
self._auth_token = None
|
||||
self._failed_auth = True
|
||||
|
||||
return self._auth_token
|
||||
|
||||
def send(self, metername, severity, resource_id, message):
|
||||
"""Send the payload to Aodh."""
|
||||
# get the auth_token
|
||||
auth_token = self._authenticate()
|
||||
LOGGER.info('Auth_token: %s',
|
||||
auth_token,
|
||||
)
|
||||
# if auth_token is not set, there is nothing to do
|
||||
if auth_token is None:
|
||||
LOGGER.debug('Unable to send data. Not authenticated')
|
||||
return
|
||||
|
||||
if self._url_base is None:
|
||||
LOGGER.debug(
|
||||
'Unable to send data. Missing endpoint from ident server')
|
||||
return
|
||||
|
||||
# Create alarm name
|
||||
alarm_name = self._get_alarm_name(metername, resource_id)
|
||||
|
||||
# check for and/or get alarm_id
|
||||
alarm_id = self._get_alarm_id(alarm_name, severity, metername, message)
|
||||
|
||||
if alarm_id is not None:
|
||||
result = self._update_alarm(alarm_id, message, auth_token)
|
||||
else:
|
||||
result = None
|
||||
|
||||
if result is None:
|
||||
return
|
||||
|
||||
LOGGER.info('Result: %s %s',
|
||||
six.text_type(result.status_code),
|
||||
result.text)
|
||||
|
||||
# if the request failed due to an auth error
|
||||
if result.status_code == HTTP_UNAUTHORIZED:
|
||||
# reset the auth token in order to force the subsequent
|
||||
# _authenticate() call to renew it
|
||||
# Here, it can happen that the token is reset right after
|
||||
# another thread has finished the authentication and thus
|
||||
# the authentication may be performed twice
|
||||
self._auth_token = None
|
||||
|
||||
# renew the authentication token
|
||||
auth_token = self._authenticate()
|
||||
|
||||
if auth_token is not None:
|
||||
if alarm_id is not None:
|
||||
result = self._update_alarm(alarm_id, message, auth_token)
|
||||
else:
|
||||
result = None
|
||||
|
||||
if result.status_code == HTTP_NOT_FOUND:
|
||||
LOGGER.debug("Received 404 error when submitting %s sample, \
|
||||
creating a new metric",
|
||||
alarm_name)
|
||||
|
||||
# check for and/or get alarm_id
|
||||
alarm_id = self._get_alarm_id(alarm_name, severity,
|
||||
metername, message)
|
||||
|
||||
LOGGER.info('alarmname: %s, alarm_id: %s', alarm_name, alarm_id)
|
||||
if alarm_id is not None:
|
||||
result = self._update_alarm(alarm_id, message, auth_token)
|
||||
else:
|
||||
result = None
|
||||
|
||||
if result.status_code == HTTP_CREATED:
|
||||
LOGGER.debug('Result: %s', HTTP_CREATED)
|
||||
else:
|
||||
LOGGER.info('Result: %s %s',
|
||||
result.status_code,
|
||||
result.text)
|
||||
|
||||
def _get_endpoint(self, service):
|
||||
"""Get the uri of service endpoint."""
|
||||
endpoint = self._keystone.get_service_endpoint(
|
||||
service,
|
||||
Config.instance().CEILOMETER_URL_TYPE)
|
||||
return endpoint
|
||||
|
||||
def _get_alarm_id(self, alarm_name, severity, metername, message):
|
||||
"""Check for existing alarm and its id or create a new one."""
|
||||
try:
|
||||
return self._alarm_ids[alarm_name]
|
||||
|
||||
except KeyError as ke:
|
||||
LOGGER.warn(ke)
|
||||
|
||||
endpoint = self._get_endpoint("aodh")
|
||||
if alarm_name not in self._alarm_names:
|
||||
LOGGER.warn('No known ID for %s', alarm_name)
|
||||
self._alarm_names.append(alarm_name)
|
||||
self._alarm_ids[alarm_name] = \
|
||||
self._create_alarm(endpoint, severity,
|
||||
metername, alarm_name, message)
|
||||
return None
|
||||
|
||||
def _create_alarm(self, endpoint, severity, metername,
|
||||
alarm_name, message):
|
||||
"""Create a new alarm with a new alarm_id."""
|
||||
url = "{}/v2/alarms/".format(endpoint)
|
||||
|
||||
rule = {'event_type': metername, }
|
||||
payload = json.dumps({'state': self._get_alarm_state(message),
|
||||
'name': alarm_name,
|
||||
'severity': severity,
|
||||
'type': "event",
|
||||
'event_rule': rule,
|
||||
})
|
||||
|
||||
result = self._perform_post_request(url, payload, self._auth_token)
|
||||
alarm_id = json.loads(result.text)['alarm_id']
|
||||
LOGGER.debug("alarm_id=%s", alarm_id)
|
||||
return alarm_id
|
||||
|
||||
def _get_alarm_state(self, message):
|
||||
"""Get the state of the alarm."""
|
||||
message = message.split()
|
||||
if 'above' in message:
|
||||
alarm_state = "alarm"
|
||||
elif 'within' in message:
|
||||
alarm_state = "ok"
|
||||
else:
|
||||
alarm_state = "insufficient data"
|
||||
return alarm_state
|
||||
|
||||
def _get_alarm_name(self, metername, resource_id):
|
||||
"""Get the alarm name."""
|
||||
alarm_name = metername + "(" + resource_id + ")"
|
||||
return alarm_name
|
||||
|
||||
def _update_alarm(self, alarm_id, message, auth_token):
|
||||
"""Perform the alarm update."""
|
||||
url = self._url_base % (alarm_id)
|
||||
# create the payload and update the state of the alarm
|
||||
payload = json.dumps(self._get_alarm_state(message))
|
||||
return self._perform_update_request(url, auth_token, payload)
|
||||
|
||||
@classmethod
|
||||
def _perform_post_request(cls, url, payload, auth_token):
|
||||
"""Perform the POST request."""
|
||||
LOGGER.debug('Performing request to %s', url)
|
||||
|
||||
# request headers
|
||||
headers = {'X-Auth-Token': auth_token,
|
||||
'Content-type': 'application/json'}
|
||||
# perform request and return its result
|
||||
response = None
|
||||
try:
|
||||
LOGGER.debug(
|
||||
"Performing request to: %s with data=%s and headers=%s",
|
||||
url, payload, headers)
|
||||
|
||||
response = requests.post(
|
||||
url, data=payload, headers=headers,
|
||||
timeout=(Config.instance().CEILOMETER_TIMEOUT / 1000.))
|
||||
LOGGER.info('Response: %s: %s',
|
||||
response.status_code, response.text
|
||||
)
|
||||
except RequestException as exc:
|
||||
LOGGER.error('aodh request error: %s', six.text_type(exc))
|
||||
finally:
|
||||
if response is not None:
|
||||
LOGGER.debug(
|
||||
"Returning response from _perform_post_request(): %s",
|
||||
response.status_code)
|
||||
return response
|
||||
|
||||
@classmethod
|
||||
def _perform_update_request(cls, url, auth_token, payload):
|
||||
"""Perform the PUT/update request."""
|
||||
LOGGER.debug('Performing request to %s', url)
|
||||
|
||||
# request headers
|
||||
headers = {'X-Auth-Token': auth_token,
|
||||
'Content-type': 'application/json'}
|
||||
# perform request and return its result
|
||||
response = None
|
||||
try:
|
||||
LOGGER.debug(
|
||||
"Performing request to: %s with data=%s and headers=%s",
|
||||
url, payload, headers)
|
||||
response = requests.put(
|
||||
url, data=payload, headers=headers,
|
||||
timeout=(Config.instance().CEILOMETER_TIMEOUT / 1000.))
|
||||
LOGGER.info('Response: %s: %s',
|
||||
response.status_code, response.text
|
||||
)
|
||||
except RequestException as exc:
|
||||
LOGGER.error('aodh request error: %s', six.text_type(exc))
|
||||
finally:
|
||||
if response is not None:
|
||||
LOGGER.debug(
|
||||
'Returning response from _perform_update_request(): %s',
|
||||
response.status_code)
|
||||
return response
|
@ -11,7 +11,8 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Default collectd meter"""
|
||||
|
||||
"""Default collectd meter."""
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
@ -22,35 +23,35 @@ LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Meter(object):
|
||||
"""Default collectd meter"""
|
||||
"""Default collectd meter."""
|
||||
|
||||
def __init__(self, collectd):
|
||||
"""Instantiate meter instance."""
|
||||
self._collectd = collectd
|
||||
|
||||
def meter_name(self, vl):
|
||||
"""Return meter name"""
|
||||
"""Return meter name."""
|
||||
# pylint: disable=no-self-use
|
||||
resources = [vl.plugin, vl.type]
|
||||
return '.'.join([i for i in resources if i])
|
||||
|
||||
def hostname(self, vl):
|
||||
"""Get host name"""
|
||||
"""Get host name."""
|
||||
# pylint: disable=no-self-use
|
||||
return vl.host
|
||||
|
||||
def resource_id(self, vl):
|
||||
"""Get resource ID"""
|
||||
|
||||
"""Get resource ID."""
|
||||
resources = [self.hostname(vl), vl.plugin_instance, vl.type_instance]
|
||||
return '-'.join([i for i in resources if i])
|
||||
|
||||
def unit(self, vl):
|
||||
"""Get meter unit"""
|
||||
"""Get meter unit."""
|
||||
# pylint: disable=no-self-use
|
||||
return Config.instance().unit(vl.plugin, vl.type)
|
||||
|
||||
def sample_type(self, vl):
|
||||
"""Translate from collectd counter type to Ceilometer type"""
|
||||
"""Translate from collectd counter type to Ceilometer type."""
|
||||
types = {"gauge": "gauge",
|
||||
"derive": "delta",
|
||||
"absolute": "cumulative",
|
||||
@ -66,3 +67,15 @@ class Meter(object):
|
||||
collectd_type = "gauge"
|
||||
|
||||
return types[collectd_type]
|
||||
|
||||
def message(self, vl):
|
||||
"""Get the notification message."""
|
||||
return vl.message
|
||||
|
||||
def severity(self, vl):
|
||||
"""Get the notification severity and translate to Aodh severity type."""
|
||||
collectd_severity = {self._collectd.NOTIF_FAILURE: 'critical',
|
||||
self._collectd.NOTIF_WARNING: 'moderate',
|
||||
self._collectd.NOTIF_OKAY: 'low',
|
||||
}.get(vl.severity)
|
||||
return collectd_severity
|
||||
|
0
collectd_ceilometer/tests/aodh/__init__.py
Normal file
0
collectd_ceilometer/tests/aodh/__init__.py
Normal file
347
collectd_ceilometer/tests/aodh/test_plugin.py
Normal file
347
collectd_ceilometer/tests/aodh/test_plugin.py
Normal file
@ -0,0 +1,347 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2010-2011 OpenStack Foundation
|
||||
# Copyright (c) 2015 Intel Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Plugin tests."""
|
||||
|
||||
import logging
|
||||
import mock
|
||||
import requests
|
||||
import unittest
|
||||
|
||||
from collectd_ceilometer.aodh import plugin
|
||||
from collectd_ceilometer.aodh import sender
|
||||
from collectd_ceilometer.common.keystone_light import KeystoneException
|
||||
|
||||
Logger = logging.getLoggerClass()
|
||||
|
||||
|
||||
def mock_collectd(**kwargs):
|
||||
"""Return collecd module with collecd logging hooks."""
|
||||
return mock.patch(
|
||||
__name__ + '.' + MockedCollectd.__name__, specs=True,
|
||||
get_dataset=mock.MagicMock(side_effect=Exception),
|
||||
get=mock.MagicMock(), **kwargs)
|
||||
|
||||
|
||||
class MockedCollectd(object):
|
||||
"""Mocked collectd module specifications."""
|
||||
|
||||
def debug(self, record):
|
||||
"""Hook for debug messages."""
|
||||
|
||||
def info(self, record):
|
||||
"""Hook for info messages."""
|
||||
|
||||
def warning(self, record):
|
||||
"""Hook for warning messages."""
|
||||
|
||||
def error(self, record):
|
||||
"""Hook for error messages."""
|
||||
|
||||
def register_init(self, hook):
|
||||
"""Register an hook for init."""
|
||||
|
||||
def register_config(self, hook):
|
||||
"""Register an hook for config."""
|
||||
|
||||
def register_notification(self, hook):
|
||||
"""Register an hook for notification."""
|
||||
|
||||
def register_shutdown(self, hook):
|
||||
"""Register an hook for shutdown."""
|
||||
|
||||
def get_dataset(self, s):
|
||||
"""Get a dataset."""
|
||||
|
||||
def get(self):
|
||||
"""Get notification severity."""
|
||||
|
||||
|
||||
def mock_config(**kwargs):
|
||||
"""Return collecd module with collecd logging hooks."""
|
||||
return mock.patch(
|
||||
__name__ + '.' + MockedConfig.__name__, specs=True,
|
||||
**kwargs)
|
||||
|
||||
|
||||
class MockedConfig(object):
|
||||
"""Mocked config class."""
|
||||
|
||||
|
||||
def mock_value(
|
||||
host='localhost', plugin='cpu', plugin_instance='0',
|
||||
_type='freq', type_instance=None, time=123456789,
|
||||
values=(1234,), **kwargs):
|
||||
"""Create a mock value."""
|
||||
return mock.patch(
|
||||
__name__ + '.' + MockedValue.__name__, specs=True,
|
||||
host=host, plugin=plugin, plugin_instance=plugin_instance, type=_type,
|
||||
type_instance=type_instance, time=time,
|
||||
values=list(values), meta=None, **kwargs)
|
||||
|
||||
|
||||
class MockedValue(object):
|
||||
"""Value used for testing."""
|
||||
|
||||
host = 'localhost'
|
||||
plugin = None
|
||||
plugin_instance = None
|
||||
type = None
|
||||
type_instance = None
|
||||
time = 123456789
|
||||
values = []
|
||||
meta = None
|
||||
|
||||
|
||||
class TestPlugin(unittest.TestCase):
|
||||
"""Test the collectd plugin."""
|
||||
|
||||
@mock.patch.object(plugin, 'Plugin', autospec=True)
|
||||
@mock.patch.object(plugin, 'Config', autospec=True)
|
||||
@mock.patch.object(plugin, 'CollectdLogHandler', autospec=True)
|
||||
@mock.patch.object(plugin, 'ROOT_LOGGER', autospec=True)
|
||||
@mock_collectd()
|
||||
def test_callbacks(
|
||||
self, collectd, ROOT_LOGGER, CollectdLogHandler, Config, Plugin):
|
||||
"""Verify that the callbacks are registered properly."""
|
||||
# When plugin function is called
|
||||
plugin.register_plugin(collectd=collectd)
|
||||
|
||||
# Logger handler is set up
|
||||
ROOT_LOGGER.addHandler.assert_called_once_with(
|
||||
CollectdLogHandler.return_value)
|
||||
ROOT_LOGGER.setLevel.assert_called_once_with(logging.NOTSET)
|
||||
|
||||
# It creates a plugin
|
||||
Plugin.assert_called_once_with(
|
||||
collectd=collectd, config=Config.instance.return_value)
|
||||
|
||||
# callbacks are registered to collectd
|
||||
instance = Plugin.return_value
|
||||
collectd.register_config.assert_called_once_with(instance.config)
|
||||
collectd.register_notification.assert_called_once_with(instance.notify)
|
||||
collectd.register_shutdown.assert_called_once_with(instance.shutdown)
|
||||
|
||||
@mock.patch.object(sender.Sender, '_get_alarm_id', autospec=True)
|
||||
@mock.patch.object(requests, 'put', spec=callable)
|
||||
@mock.patch.object(sender, 'ClientV3', autospec=True)
|
||||
@mock_collectd()
|
||||
@mock_config()
|
||||
@mock_value()
|
||||
def test_notify(self, data, config, collectd, ClientV3, put,
|
||||
_get_alarm_id):
|
||||
"""Test collectd data notifying."""
|
||||
auth_client = ClientV3.return_value
|
||||
auth_client.get_service_endpoint.return_value = \
|
||||
'https://test-aodh.tld'
|
||||
|
||||
put.return_value.status_code = sender.HTTP_CREATED
|
||||
put.return_value.text = 'Updated'
|
||||
|
||||
# Test when the value doesn't get updated
|
||||
_get_alarm_id.return_value = None
|
||||
|
||||
# init instance
|
||||
instance = plugin.Plugin(collectd=collectd, config=config)
|
||||
|
||||
# send the value(doesn't update)
|
||||
instance.notify(data)
|
||||
collectd.error.assert_not_called()
|
||||
|
||||
put.assert_not_called()
|
||||
|
||||
# Test when an alarm will be updated
|
||||
_get_alarm_id.return_value = 'my-alarm-id'
|
||||
|
||||
# send the value(updates)
|
||||
instance.notify(data)
|
||||
collectd.error.assert_not_called()
|
||||
|
||||
# authentication client that has been created
|
||||
ClientV3.assert_called_once()
|
||||
|
||||
# and values that have been sent
|
||||
put.assert_called_once_with(
|
||||
'https://test-aodh.tld' +
|
||||
'/v2/alarms/my-alarm-id/state',
|
||||
data='"insufficient data"',
|
||||
headers={'Content-type': 'application/json',
|
||||
'X-Auth-Token': auth_client.auth_token},
|
||||
timeout=1.0)
|
||||
|
||||
# reset post method
|
||||
put.reset_mock()
|
||||
|
||||
# call shutdown
|
||||
instance.shutdown()
|
||||
|
||||
@mock.patch.object(requests, 'put', spec=callable)
|
||||
@mock.patch.object(sender, 'ClientV3', autospec=True)
|
||||
@mock.patch.object(sender, 'LOGGER', autospec=True)
|
||||
@mock_collectd()
|
||||
@mock_config()
|
||||
@mock_value()
|
||||
def test_notify_auth_failed(
|
||||
self, data, config, collectd, LOGGER, ClientV3, put):
|
||||
"""Test authentication failure."""
|
||||
# tell the auth client to raise an exception
|
||||
ClientV3.side_effect = KeystoneException(
|
||||
"Missing name 'xxx' in received services",
|
||||
"exception",
|
||||
"services list")
|
||||
|
||||
# init instance
|
||||
instance = plugin.Plugin(collectd=collectd, config=config)
|
||||
|
||||
# notify of another value the value
|
||||
instance.notify(data)
|
||||
|
||||
LOGGER.error.assert_called_once_with(
|
||||
"Suspending error logs until successful auth")
|
||||
LOGGER.log.assert_called_once_with(
|
||||
logging.ERROR, "Authentication error: %s",
|
||||
"Missing name 'xxx' in received services\nReason: exception",
|
||||
exc_info=0)
|
||||
|
||||
# no requests method has been called
|
||||
put.assert_not_called()
|
||||
|
||||
@mock.patch.object(sender.Sender, '_perform_post_request', spec=callable)
|
||||
@mock.patch.object(sender, 'ClientV3', autospec=True)
|
||||
@mock_collectd()
|
||||
@mock_config()
|
||||
@mock_value()
|
||||
def test_request_error(
|
||||
self, data, config, collectd, ClientV3, perf_req):
|
||||
"""Test error raised by underlying requests module."""
|
||||
# tell POST request to raise an exception
|
||||
perf_req.side_effect = requests.RequestException('Test POST exception')
|
||||
|
||||
# init instance
|
||||
instance = plugin.Plugin(collectd=collectd, config=config)
|
||||
|
||||
# the value
|
||||
self.assertRaises(requests.RequestException, instance.notify, data)
|
||||
|
||||
@mock.patch.object(sender.Sender, '_get_alarm_id', autospec=True)
|
||||
@mock.patch.object(requests, 'put', spec=callable)
|
||||
@mock.patch.object(sender, 'ClientV3', autospec=True)
|
||||
@mock_collectd()
|
||||
@mock_config()
|
||||
@mock_value()
|
||||
def test_reauthentication(self, data, config, collectd,
|
||||
ClientV3, put, _get_alarm_id):
|
||||
"""Test re-authentication for update request."""
|
||||
# init instance
|
||||
instance = plugin.Plugin(collectd=collectd, config=config)
|
||||
|
||||
# test for non update request
|
||||
_get_alarm_id.return_value = None
|
||||
|
||||
# response returned on success
|
||||
response_ok = requests.Response()
|
||||
response_ok.status_code = requests.codes["OK"]
|
||||
|
||||
# response returned on failure
|
||||
response_unauthorized = requests.Response()
|
||||
response_unauthorized.status_code = requests.codes["UNAUTHORIZED"]
|
||||
|
||||
put.return_value = response_ok
|
||||
|
||||
client = ClientV3.return_value
|
||||
client.auth_token = 'Test auth token'
|
||||
|
||||
# notify of the value
|
||||
instance.notify(data)
|
||||
|
||||
# de-assert the non-update request
|
||||
put.assert_not_called()
|
||||
|
||||
# test for update request
|
||||
_get_alarm_id.return_value = 'my-alarm-id'
|
||||
|
||||
# response returned on success
|
||||
response_ok = requests.Response()
|
||||
response_ok.status_code = requests.codes["OK"]
|
||||
|
||||
# response returned on failure
|
||||
response_unauthorized = requests.Response()
|
||||
response_unauthorized.status_code = requests.codes["UNAUTHORIZED"]
|
||||
|
||||
put.return_value = response_ok
|
||||
|
||||
client = ClientV3.return_value
|
||||
client.auth_token = 'Test auth token'
|
||||
|
||||
# notify of the value
|
||||
instance.notify(data)
|
||||
|
||||
# verify the auth token
|
||||
put.assert_called_once_with(
|
||||
mock.ANY, data=mock.ANY,
|
||||
headers={u'Content-type': mock.ANY,
|
||||
u'X-Auth-Token': 'Test auth token'},
|
||||
timeout=1.0)
|
||||
|
||||
# POST response is unauthorized -> new token needs to be acquired
|
||||
put.side_effect = [response_unauthorized, response_ok]
|
||||
|
||||
# set a new auth token
|
||||
client.auth_token = 'New test auth token'
|
||||
|
||||
instance.notify(data)
|
||||
|
||||
# verify the auth token:
|
||||
call_list = put.call_args_list
|
||||
# POST called three times
|
||||
self.assertEqual(len(call_list), 3)
|
||||
|
||||
# the second call contains the old token
|
||||
token = call_list[1][1]['headers']['X-Auth-Token']
|
||||
self.assertEqual(token, 'Test auth token')
|
||||
# the third call contains the new token
|
||||
token = call_list[2][1]['headers']['X-Auth-Token']
|
||||
self.assertEqual(token, 'New test auth token')
|
||||
|
||||
@mock.patch.object(sender, 'ClientV3', autospec=True)
|
||||
@mock.patch.object(plugin, 'Notifier', autospec=True)
|
||||
@mock.patch.object(plugin, 'LOGGER', autospec=True)
|
||||
@mock_collectd()
|
||||
@mock_config()
|
||||
@mock_value()
|
||||
def test_exception_value_error(self, data, config, collectd,
|
||||
LOGGER, Notifier, ClientV3):
|
||||
"""Test exception raised during notify and shutdown."""
|
||||
notifier = Notifier.return_value
|
||||
notifier.notify.side_effect = ValueError('Test notify error')
|
||||
|
||||
# init instance
|
||||
instance = plugin.Plugin(collectd=collectd, config=config)
|
||||
|
||||
self.assertRaises(ValueError, instance.notify, data)
|
||||
|
||||
@mock.patch.object(sender, 'ClientV3', autospec=True)
|
||||
@mock.patch.object(plugin, 'LOGGER', autospec=True)
|
||||
@mock_collectd()
|
||||
@mock_config()
|
||||
def test_exception_runtime_error(self, config, collectd,
|
||||
LOGGER, ClientV3):
|
||||
"""Test exception raised during shutdown."""
|
||||
# init instance
|
||||
instance = plugin.Plugin(collectd=collectd, config=config)
|
||||
|
||||
instance.shutdown
|
32
etc/collectd.conf.d/collectd-aodh-plugin.conf
Normal file
32
etc/collectd.conf.d/collectd-aodh-plugin.conf
Normal file
@ -0,0 +1,32 @@
|
||||
<LoadPlugin python>
|
||||
Globals true
|
||||
</LoadPlugin>
|
||||
|
||||
<Plugin python>
|
||||
ModulePath "/opt/stack/collectd-ceilometer"
|
||||
LogTraces true
|
||||
Interactive false
|
||||
Import "collectd_ceilometer.aodh.plugin"
|
||||
|
||||
<Module "collectd_ceilometer.aodh.plugin">
|
||||
|
||||
# Verbosity True|False
|
||||
VERBOSE False
|
||||
|
||||
# Service endpoint addresses
|
||||
OS_AUTH_URL "<OS_AUTH_URL>"
|
||||
|
||||
# Ceilometer address
|
||||
#CEILOMETER_URL_TYPE "internalURL"
|
||||
|
||||
# Ceilometer timeout in ms
|
||||
#CEILOMETER_TIMEOUT "1000"
|
||||
|
||||
# # Ceilometer user creds
|
||||
OS_USERNAME "aodh"
|
||||
OS_PASSWORD "password"
|
||||
OS_TENANT_NAME "service"
|
||||
|
||||
</Module>
|
||||
</Plugin>
|
||||
|
@ -0,0 +1,15 @@
|
||||
---
|
||||
prelude: >
|
||||
- The collectd-aodh plugin has been added to the repo. It allows collectd
|
||||
notifications to be sent to Aodh, where an alarm is created. Any
|
||||
notifications are either used to create a new alarm or update an existing
|
||||
alarm. The alarms that are created are all of type event. These alarms
|
||||
will only be updated if a new notification is sent from collectd that
|
||||
will cause a change in state.
|
||||
features:
|
||||
- Provides the collectd-aodh-plugin. This allows collectd notifications
|
||||
to to be sent to Aodh, where it they are used to create corresponding alarms.
|
||||
The notifications sent are used to create new alarms or update existing
|
||||
alarms in Aodh. These alarms are of type event, and updates on these alarms
|
||||
occur if the notification from collectd will cause a change in the alarm
|
||||
state.
|
Loading…
Reference in New Issue
Block a user