From 1933ec8f293cd6594b15a78a4e0fa43263186163 Mon Sep 17 00:00:00 2001 From: Helena McGough Date: Mon, 13 Feb 2017 10:57:27 +0000 Subject: [PATCH] Created the collectd-aodh-plugin - Provided code that creates the collectd-aodh-plugin. Aodh alarms are created based on collectd notifications. Event alarms are created. - Included a .conf file for this change. - Added changes to the base.py file for the meters to facilitate this change. - Included a reno for this change. - Included tests for this change. Change-Id: I7db01df69104aded199a8c43e8c7b433ff549763 --- collectd_ceilometer/aodh/__init__.py | 0 collectd_ceilometer/aodh/notifier.py | 56 +++ collectd_ceilometer/aodh/plugin.py | 85 +++++ collectd_ceilometer/aodh/sender.py | 311 ++++++++++++++++ collectd_ceilometer/common/meters/base.py | 29 +- collectd_ceilometer/tests/aodh/__init__.py | 0 collectd_ceilometer/tests/aodh/test_plugin.py | 347 ++++++++++++++++++ etc/collectd.conf.d/collectd-aodh-plugin.conf | 32 ++ ...collectd-aodh-plugin-35e6c312ef89746e.yaml | 15 + 9 files changed, 867 insertions(+), 8 deletions(-) create mode 100644 collectd_ceilometer/aodh/__init__.py create mode 100644 collectd_ceilometer/aodh/notifier.py create mode 100644 collectd_ceilometer/aodh/plugin.py create mode 100644 collectd_ceilometer/aodh/sender.py create mode 100644 collectd_ceilometer/tests/aodh/__init__.py create mode 100644 collectd_ceilometer/tests/aodh/test_plugin.py create mode 100644 etc/collectd.conf.d/collectd-aodh-plugin.conf create mode 100644 releasenotes/notes/collectd-aodh-plugin-35e6c312ef89746e.yaml diff --git a/collectd_ceilometer/aodh/__init__.py b/collectd_ceilometer/aodh/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/collectd_ceilometer/aodh/notifier.py b/collectd_ceilometer/aodh/notifier.py new file mode 100644 index 0000000..746d094 --- /dev/null +++ b/collectd_ceilometer/aodh/notifier.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Aodh collectd plugin implementation.""" + +from __future__ import unicode_literals + +from collectd_ceilometer.aodh.sender import Sender + +import datetime +import logging + +LOGGER = logging.getLogger(__name__) + + +class Notifier(object): + """Aodh notifier.""" + + def __init__(self, meters, config): + """Initialize Notifier.""" + self._meters = meters + self._sender = Sender() + self._config = config + + def notify(self, vl, data): + """Collect data from collectd.""" + # take the plugin (specialized or default) for parsing the data + notification = self._meters.get(vl.plugin) + # prepare all data related to the sample + metername = notification.meter_name(vl) + message = notification.message(vl) + severity = notification.severity(vl) + resource_id = notification.resource_id(vl) + timestamp = datetime.datetime.utcfromtimestamp(vl.time).isoformat() + + LOGGER.debug( + 'Writing: plugin="%s", message="%s", severity="%s", time="%s', + vl.plugin, message, severity, timestamp) + + self._send_data(metername, severity, resource_id, message) + + def _send_data(self, metername, severity, resource_id, message): + """Send data to Aodh.""" + LOGGER.debug('Sending alarm for %s', metername) + self._sender.send(metername, severity, resource_id, message) diff --git a/collectd_ceilometer/aodh/plugin.py b/collectd_ceilometer/aodh/plugin.py new file mode 100644 index 0000000..7b33ea3 --- /dev/null +++ b/collectd_ceilometer/aodh/plugin.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Aodh collectd plugin.""" + +import logging + +try: + # pylint: disable=import-error + import collectd + # pylint: enable=import-error +except ImportError: + collectd = None # when running unit tests collectd is not avaliable + +import collectd_ceilometer +from collectd_ceilometer.aodh.notifier import Notifier +from collectd_ceilometer.common.logger import CollectdLogHandler +from collectd_ceilometer.common.meters import MeterStorage +from collectd_ceilometer.common.settings import Config + + +LOGGER = logging.getLogger(__name__) +ROOT_LOGGER = logging.getLogger(collectd_ceilometer.__name__) + + +def register_plugin(collectd): + """Bind plugin hooks to collectd and viceversa.""" + config = Config.instance() + + # Setup loggging + log_handler = CollectdLogHandler(collectd=collectd) + log_handler.cfg = config + ROOT_LOGGER.addHandler(log_handler) + ROOT_LOGGER.setLevel(logging.NOTSET) + + # Creates collectd plugin instance + instance = Plugin(collectd=collectd, config=config) + + # Register plugin callbacks + collectd.register_config(instance.config) + collectd.register_shutdown(instance.shutdown) + collectd.register_notification(instance.notify) + + +class Plugin(object): + """Aodh plugin with collectd callbacks.""" + + # NOTE: this is a multithreaded class + + def __init__(self, collectd, config): + """Plugin instance.""" + self._config = config + self._meters = MeterStorage(collectd=collectd) + self._notifier = Notifier(self._meters, config=config) + + def config(self, cfg): + """Configuration callback. + + @param cfg configuration node provided by collectd + """ + self._config.read(cfg) + + def notify(self, vl, data=None): + """Notification callback.""" + LOGGER.info("Notification") + self._notifier.notify(vl, data) + + def shutdown(self): + """Shutdown callback.""" + LOGGER.info("SHUTDOWN") + + +if collectd: + register_plugin(collectd=collectd) diff --git a/collectd_ceilometer/aodh/sender.py b/collectd_ceilometer/aodh/sender.py new file mode 100644 index 0000000..929e943 --- /dev/null +++ b/collectd_ceilometer/aodh/sender.py @@ -0,0 +1,311 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Aodh collectd plugin implementation.""" + +from __future__ import division +from __future__ import unicode_literals + +import collectd_ceilometer +from collectd_ceilometer.common.keystone_light import ClientV3 +from collectd_ceilometer.common.keystone_light import KeystoneException +from collectd_ceilometer.common.settings import Config + +import json +import logging +import requests +from requests.exceptions import RequestException +import six +import threading + +LOGGER = logging.getLogger(__name__) +ROOT_LOGGER = logging.getLogger(collectd_ceilometer.__name__) + +# HTTP status codes +HTTP_CREATED = 201 +HTTP_UNAUTHORIZED = 401 +HTTP_NOT_FOUND = 404 + + +class Sender(object): + """Sends the JSON serialized data to Aodh.""" + + def __init__(self): + """Create the Sender instance. + + The cofiguration must be initialized before the object is created. + """ + self._url_base = None + self._keystone = None + self._auth_token = None + self._auth_lock = threading.Lock() + self._failed_auth = False + self._alarm_ids = {} + self._alarm_names = list() + + def _authenticate(self): + """Authenticate and renew the authentication token.""" + # if auth_token is available, just return it + if self._auth_token is not None: + return self._auth_token + + # aquire the authentication lock + with self._auth_lock: + # re-check the auth_token as another thread could set it + if self._auth_token is not None: + return self._auth_token + + LOGGER.debug('Authenticating request') + # pylint: disable=broad-except + try: + # create a keystone client if it doesn't exist + if self._keystone is None: + cfg = Config.instance() + self._keystone = ClientV3( + auth_url=cfg.OS_AUTH_URL, + username=cfg.OS_USERNAME, + password=cfg.OS_PASSWORD, + tenant_name=cfg.OS_TENANT_NAME + ) + # store the authentication token + self._auth_token = self._keystone.auth_token + + # get the uri of service endpoint + endpoint = self._get_endpoint("aodh") + + self._url_base = "{}/v2/alarms/%s/state".format(endpoint) + + LOGGER.info('Authenticating request - success') + self._failed_auth = False + + except KeystoneException as exc: + log_level = logging.DEBUG + + if not self._failed_auth: + + log_level = logging.ERROR + LOGGER.error( + 'Suspending error logs until successful auth' + ) + + LOGGER.log(log_level, 'Authentication error: %s', + six.text_type(exc), + exc_info=0) + + if exc.response: + LOGGER.debug('Response: %s', exc.response) + + self._auth_token = None + self._failed_auth = True + + return self._auth_token + + def send(self, metername, severity, resource_id, message): + """Send the payload to Aodh.""" + # get the auth_token + auth_token = self._authenticate() + LOGGER.info('Auth_token: %s', + auth_token, + ) + # if auth_token is not set, there is nothing to do + if auth_token is None: + LOGGER.debug('Unable to send data. Not authenticated') + return + + if self._url_base is None: + LOGGER.debug( + 'Unable to send data. Missing endpoint from ident server') + return + + # Create alarm name + alarm_name = self._get_alarm_name(metername, resource_id) + + # check for and/or get alarm_id + alarm_id = self._get_alarm_id(alarm_name, severity, metername, message) + + if alarm_id is not None: + result = self._update_alarm(alarm_id, message, auth_token) + else: + result = None + + if result is None: + return + + LOGGER.info('Result: %s %s', + six.text_type(result.status_code), + result.text) + + # if the request failed due to an auth error + if result.status_code == HTTP_UNAUTHORIZED: + # reset the auth token in order to force the subsequent + # _authenticate() call to renew it + # Here, it can happen that the token is reset right after + # another thread has finished the authentication and thus + # the authentication may be performed twice + self._auth_token = None + + # renew the authentication token + auth_token = self._authenticate() + + if auth_token is not None: + if alarm_id is not None: + result = self._update_alarm(alarm_id, message, auth_token) + else: + result = None + + if result.status_code == HTTP_NOT_FOUND: + LOGGER.debug("Received 404 error when submitting %s sample, \ + creating a new metric", + alarm_name) + + # check for and/or get alarm_id + alarm_id = self._get_alarm_id(alarm_name, severity, + metername, message) + + LOGGER.info('alarmname: %s, alarm_id: %s', alarm_name, alarm_id) + if alarm_id is not None: + result = self._update_alarm(alarm_id, message, auth_token) + else: + result = None + + if result.status_code == HTTP_CREATED: + LOGGER.debug('Result: %s', HTTP_CREATED) + else: + LOGGER.info('Result: %s %s', + result.status_code, + result.text) + + def _get_endpoint(self, service): + """Get the uri of service endpoint.""" + endpoint = self._keystone.get_service_endpoint( + service, + Config.instance().CEILOMETER_URL_TYPE) + return endpoint + + def _get_alarm_id(self, alarm_name, severity, metername, message): + """Check for existing alarm and its id or create a new one.""" + try: + return self._alarm_ids[alarm_name] + + except KeyError as ke: + LOGGER.warn(ke) + + endpoint = self._get_endpoint("aodh") + if alarm_name not in self._alarm_names: + LOGGER.warn('No known ID for %s', alarm_name) + self._alarm_names.append(alarm_name) + self._alarm_ids[alarm_name] = \ + self._create_alarm(endpoint, severity, + metername, alarm_name, message) + return None + + def _create_alarm(self, endpoint, severity, metername, + alarm_name, message): + """Create a new alarm with a new alarm_id.""" + url = "{}/v2/alarms/".format(endpoint) + + rule = {'event_type': metername, } + payload = json.dumps({'state': self._get_alarm_state(message), + 'name': alarm_name, + 'severity': severity, + 'type': "event", + 'event_rule': rule, + }) + + result = self._perform_post_request(url, payload, self._auth_token) + alarm_id = json.loads(result.text)['alarm_id'] + LOGGER.debug("alarm_id=%s", alarm_id) + return alarm_id + + def _get_alarm_state(self, message): + """Get the state of the alarm.""" + message = message.split() + if 'above' in message: + alarm_state = "alarm" + elif 'within' in message: + alarm_state = "ok" + else: + alarm_state = "insufficient data" + return alarm_state + + def _get_alarm_name(self, metername, resource_id): + """Get the alarm name.""" + alarm_name = metername + "(" + resource_id + ")" + return alarm_name + + def _update_alarm(self, alarm_id, message, auth_token): + """Perform the alarm update.""" + url = self._url_base % (alarm_id) + # create the payload and update the state of the alarm + payload = json.dumps(self._get_alarm_state(message)) + return self._perform_update_request(url, auth_token, payload) + + @classmethod + def _perform_post_request(cls, url, payload, auth_token): + """Perform the POST request.""" + LOGGER.debug('Performing request to %s', url) + + # request headers + headers = {'X-Auth-Token': auth_token, + 'Content-type': 'application/json'} + # perform request and return its result + response = None + try: + LOGGER.debug( + "Performing request to: %s with data=%s and headers=%s", + url, payload, headers) + + response = requests.post( + url, data=payload, headers=headers, + timeout=(Config.instance().CEILOMETER_TIMEOUT / 1000.)) + LOGGER.info('Response: %s: %s', + response.status_code, response.text + ) + except RequestException as exc: + LOGGER.error('aodh request error: %s', six.text_type(exc)) + finally: + if response is not None: + LOGGER.debug( + "Returning response from _perform_post_request(): %s", + response.status_code) + return response + + @classmethod + def _perform_update_request(cls, url, auth_token, payload): + """Perform the PUT/update request.""" + LOGGER.debug('Performing request to %s', url) + + # request headers + headers = {'X-Auth-Token': auth_token, + 'Content-type': 'application/json'} + # perform request and return its result + response = None + try: + LOGGER.debug( + "Performing request to: %s with data=%s and headers=%s", + url, payload, headers) + response = requests.put( + url, data=payload, headers=headers, + timeout=(Config.instance().CEILOMETER_TIMEOUT / 1000.)) + LOGGER.info('Response: %s: %s', + response.status_code, response.text + ) + except RequestException as exc: + LOGGER.error('aodh request error: %s', six.text_type(exc)) + finally: + if response is not None: + LOGGER.debug( + 'Returning response from _perform_update_request(): %s', + response.status_code) + return response diff --git a/collectd_ceilometer/common/meters/base.py b/collectd_ceilometer/common/meters/base.py index 3b80317..8a94d01 100644 --- a/collectd_ceilometer/common/meters/base.py +++ b/collectd_ceilometer/common/meters/base.py @@ -11,7 +11,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Default collectd meter""" + +"""Default collectd meter.""" from __future__ import unicode_literals @@ -22,35 +23,35 @@ LOGGER = logging.getLogger(__name__) class Meter(object): - """Default collectd meter""" + """Default collectd meter.""" def __init__(self, collectd): + """Instantiate meter instance.""" self._collectd = collectd def meter_name(self, vl): - """Return meter name""" + """Return meter name.""" # pylint: disable=no-self-use resources = [vl.plugin, vl.type] return '.'.join([i for i in resources if i]) def hostname(self, vl): - """Get host name""" + """Get host name.""" # pylint: disable=no-self-use return vl.host def resource_id(self, vl): - """Get resource ID""" - + """Get resource ID.""" resources = [self.hostname(vl), vl.plugin_instance, vl.type_instance] return '-'.join([i for i in resources if i]) def unit(self, vl): - """Get meter unit""" + """Get meter unit.""" # pylint: disable=no-self-use return Config.instance().unit(vl.plugin, vl.type) def sample_type(self, vl): - """Translate from collectd counter type to Ceilometer type""" + """Translate from collectd counter type to Ceilometer type.""" types = {"gauge": "gauge", "derive": "delta", "absolute": "cumulative", @@ -66,3 +67,15 @@ class Meter(object): collectd_type = "gauge" return types[collectd_type] + + def message(self, vl): + """Get the notification message.""" + return vl.message + + def severity(self, vl): + """Get the notification severity and translate to Aodh severity type.""" + collectd_severity = {self._collectd.NOTIF_FAILURE: 'critical', + self._collectd.NOTIF_WARNING: 'moderate', + self._collectd.NOTIF_OKAY: 'low', + }.get(vl.severity) + return collectd_severity diff --git a/collectd_ceilometer/tests/aodh/__init__.py b/collectd_ceilometer/tests/aodh/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/collectd_ceilometer/tests/aodh/test_plugin.py b/collectd_ceilometer/tests/aodh/test_plugin.py new file mode 100644 index 0000000..d8d9f53 --- /dev/null +++ b/collectd_ceilometer/tests/aodh/test_plugin.py @@ -0,0 +1,347 @@ +# -*- coding: utf-8 -*- + +# Copyright 2010-2011 OpenStack Foundation +# Copyright (c) 2015 Intel Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Plugin tests.""" + +import logging +import mock +import requests +import unittest + +from collectd_ceilometer.aodh import plugin +from collectd_ceilometer.aodh import sender +from collectd_ceilometer.common.keystone_light import KeystoneException + +Logger = logging.getLoggerClass() + + +def mock_collectd(**kwargs): + """Return collecd module with collecd logging hooks.""" + return mock.patch( + __name__ + '.' + MockedCollectd.__name__, specs=True, + get_dataset=mock.MagicMock(side_effect=Exception), + get=mock.MagicMock(), **kwargs) + + +class MockedCollectd(object): + """Mocked collectd module specifications.""" + + def debug(self, record): + """Hook for debug messages.""" + + def info(self, record): + """Hook for info messages.""" + + def warning(self, record): + """Hook for warning messages.""" + + def error(self, record): + """Hook for error messages.""" + + def register_init(self, hook): + """Register an hook for init.""" + + def register_config(self, hook): + """Register an hook for config.""" + + def register_notification(self, hook): + """Register an hook for notification.""" + + def register_shutdown(self, hook): + """Register an hook for shutdown.""" + + def get_dataset(self, s): + """Get a dataset.""" + + def get(self): + """Get notification severity.""" + + +def mock_config(**kwargs): + """Return collecd module with collecd logging hooks.""" + return mock.patch( + __name__ + '.' + MockedConfig.__name__, specs=True, + **kwargs) + + +class MockedConfig(object): + """Mocked config class.""" + + +def mock_value( + host='localhost', plugin='cpu', plugin_instance='0', + _type='freq', type_instance=None, time=123456789, + values=(1234,), **kwargs): + """Create a mock value.""" + return mock.patch( + __name__ + '.' + MockedValue.__name__, specs=True, + host=host, plugin=plugin, plugin_instance=plugin_instance, type=_type, + type_instance=type_instance, time=time, + values=list(values), meta=None, **kwargs) + + +class MockedValue(object): + """Value used for testing.""" + + host = 'localhost' + plugin = None + plugin_instance = None + type = None + type_instance = None + time = 123456789 + values = [] + meta = None + + +class TestPlugin(unittest.TestCase): + """Test the collectd plugin.""" + + @mock.patch.object(plugin, 'Plugin', autospec=True) + @mock.patch.object(plugin, 'Config', autospec=True) + @mock.patch.object(plugin, 'CollectdLogHandler', autospec=True) + @mock.patch.object(plugin, 'ROOT_LOGGER', autospec=True) + @mock_collectd() + def test_callbacks( + self, collectd, ROOT_LOGGER, CollectdLogHandler, Config, Plugin): + """Verify that the callbacks are registered properly.""" + # When plugin function is called + plugin.register_plugin(collectd=collectd) + + # Logger handler is set up + ROOT_LOGGER.addHandler.assert_called_once_with( + CollectdLogHandler.return_value) + ROOT_LOGGER.setLevel.assert_called_once_with(logging.NOTSET) + + # It creates a plugin + Plugin.assert_called_once_with( + collectd=collectd, config=Config.instance.return_value) + + # callbacks are registered to collectd + instance = Plugin.return_value + collectd.register_config.assert_called_once_with(instance.config) + collectd.register_notification.assert_called_once_with(instance.notify) + collectd.register_shutdown.assert_called_once_with(instance.shutdown) + + @mock.patch.object(sender.Sender, '_get_alarm_id', autospec=True) + @mock.patch.object(requests, 'put', spec=callable) + @mock.patch.object(sender, 'ClientV3', autospec=True) + @mock_collectd() + @mock_config() + @mock_value() + def test_notify(self, data, config, collectd, ClientV3, put, + _get_alarm_id): + """Test collectd data notifying.""" + auth_client = ClientV3.return_value + auth_client.get_service_endpoint.return_value = \ + 'https://test-aodh.tld' + + put.return_value.status_code = sender.HTTP_CREATED + put.return_value.text = 'Updated' + + # Test when the value doesn't get updated + _get_alarm_id.return_value = None + + # init instance + instance = plugin.Plugin(collectd=collectd, config=config) + + # send the value(doesn't update) + instance.notify(data) + collectd.error.assert_not_called() + + put.assert_not_called() + + # Test when an alarm will be updated + _get_alarm_id.return_value = 'my-alarm-id' + + # send the value(updates) + instance.notify(data) + collectd.error.assert_not_called() + + # authentication client that has been created + ClientV3.assert_called_once() + + # and values that have been sent + put.assert_called_once_with( + 'https://test-aodh.tld' + + '/v2/alarms/my-alarm-id/state', + data='"insufficient data"', + headers={'Content-type': 'application/json', + 'X-Auth-Token': auth_client.auth_token}, + timeout=1.0) + + # reset post method + put.reset_mock() + + # call shutdown + instance.shutdown() + + @mock.patch.object(requests, 'put', spec=callable) + @mock.patch.object(sender, 'ClientV3', autospec=True) + @mock.patch.object(sender, 'LOGGER', autospec=True) + @mock_collectd() + @mock_config() + @mock_value() + def test_notify_auth_failed( + self, data, config, collectd, LOGGER, ClientV3, put): + """Test authentication failure.""" + # tell the auth client to raise an exception + ClientV3.side_effect = KeystoneException( + "Missing name 'xxx' in received services", + "exception", + "services list") + + # init instance + instance = plugin.Plugin(collectd=collectd, config=config) + + # notify of another value the value + instance.notify(data) + + LOGGER.error.assert_called_once_with( + "Suspending error logs until successful auth") + LOGGER.log.assert_called_once_with( + logging.ERROR, "Authentication error: %s", + "Missing name 'xxx' in received services\nReason: exception", + exc_info=0) + + # no requests method has been called + put.assert_not_called() + + @mock.patch.object(sender.Sender, '_perform_post_request', spec=callable) + @mock.patch.object(sender, 'ClientV3', autospec=True) + @mock_collectd() + @mock_config() + @mock_value() + def test_request_error( + self, data, config, collectd, ClientV3, perf_req): + """Test error raised by underlying requests module.""" + # tell POST request to raise an exception + perf_req.side_effect = requests.RequestException('Test POST exception') + + # init instance + instance = plugin.Plugin(collectd=collectd, config=config) + + # the value + self.assertRaises(requests.RequestException, instance.notify, data) + + @mock.patch.object(sender.Sender, '_get_alarm_id', autospec=True) + @mock.patch.object(requests, 'put', spec=callable) + @mock.patch.object(sender, 'ClientV3', autospec=True) + @mock_collectd() + @mock_config() + @mock_value() + def test_reauthentication(self, data, config, collectd, + ClientV3, put, _get_alarm_id): + """Test re-authentication for update request.""" + # init instance + instance = plugin.Plugin(collectd=collectd, config=config) + + # test for non update request + _get_alarm_id.return_value = None + + # response returned on success + response_ok = requests.Response() + response_ok.status_code = requests.codes["OK"] + + # response returned on failure + response_unauthorized = requests.Response() + response_unauthorized.status_code = requests.codes["UNAUTHORIZED"] + + put.return_value = response_ok + + client = ClientV3.return_value + client.auth_token = 'Test auth token' + + # notify of the value + instance.notify(data) + + # de-assert the non-update request + put.assert_not_called() + + # test for update request + _get_alarm_id.return_value = 'my-alarm-id' + + # response returned on success + response_ok = requests.Response() + response_ok.status_code = requests.codes["OK"] + + # response returned on failure + response_unauthorized = requests.Response() + response_unauthorized.status_code = requests.codes["UNAUTHORIZED"] + + put.return_value = response_ok + + client = ClientV3.return_value + client.auth_token = 'Test auth token' + + # notify of the value + instance.notify(data) + + # verify the auth token + put.assert_called_once_with( + mock.ANY, data=mock.ANY, + headers={u'Content-type': mock.ANY, + u'X-Auth-Token': 'Test auth token'}, + timeout=1.0) + + # POST response is unauthorized -> new token needs to be acquired + put.side_effect = [response_unauthorized, response_ok] + + # set a new auth token + client.auth_token = 'New test auth token' + + instance.notify(data) + + # verify the auth token: + call_list = put.call_args_list + # POST called three times + self.assertEqual(len(call_list), 3) + + # the second call contains the old token + token = call_list[1][1]['headers']['X-Auth-Token'] + self.assertEqual(token, 'Test auth token') + # the third call contains the new token + token = call_list[2][1]['headers']['X-Auth-Token'] + self.assertEqual(token, 'New test auth token') + + @mock.patch.object(sender, 'ClientV3', autospec=True) + @mock.patch.object(plugin, 'Notifier', autospec=True) + @mock.patch.object(plugin, 'LOGGER', autospec=True) + @mock_collectd() + @mock_config() + @mock_value() + def test_exception_value_error(self, data, config, collectd, + LOGGER, Notifier, ClientV3): + """Test exception raised during notify and shutdown.""" + notifier = Notifier.return_value + notifier.notify.side_effect = ValueError('Test notify error') + + # init instance + instance = plugin.Plugin(collectd=collectd, config=config) + + self.assertRaises(ValueError, instance.notify, data) + + @mock.patch.object(sender, 'ClientV3', autospec=True) + @mock.patch.object(plugin, 'LOGGER', autospec=True) + @mock_collectd() + @mock_config() + def test_exception_runtime_error(self, config, collectd, + LOGGER, ClientV3): + """Test exception raised during shutdown.""" + # init instance + instance = plugin.Plugin(collectd=collectd, config=config) + + instance.shutdown diff --git a/etc/collectd.conf.d/collectd-aodh-plugin.conf b/etc/collectd.conf.d/collectd-aodh-plugin.conf new file mode 100644 index 0000000..35e29d9 --- /dev/null +++ b/etc/collectd.conf.d/collectd-aodh-plugin.conf @@ -0,0 +1,32 @@ + + Globals true + + + + ModulePath "/opt/stack/collectd-ceilometer" + LogTraces true + Interactive false + Import "collectd_ceilometer.aodh.plugin" + + + + # Verbosity True|False + VERBOSE False + + # Service endpoint addresses + OS_AUTH_URL "" + + # Ceilometer address + #CEILOMETER_URL_TYPE "internalURL" + + # Ceilometer timeout in ms + #CEILOMETER_TIMEOUT "1000" + + # # Ceilometer user creds + OS_USERNAME "aodh" + OS_PASSWORD "password" + OS_TENANT_NAME "service" + + + + diff --git a/releasenotes/notes/collectd-aodh-plugin-35e6c312ef89746e.yaml b/releasenotes/notes/collectd-aodh-plugin-35e6c312ef89746e.yaml new file mode 100644 index 0000000..9721bc8 --- /dev/null +++ b/releasenotes/notes/collectd-aodh-plugin-35e6c312ef89746e.yaml @@ -0,0 +1,15 @@ +--- +prelude: > + - The collectd-aodh plugin has been added to the repo. It allows collectd + notifications to be sent to Aodh, where an alarm is created. Any + notifications are either used to create a new alarm or update an existing + alarm. The alarms that are created are all of type event. These alarms + will only be updated if a new notification is sent from collectd that + will cause a change in state. +features: + - Provides the collectd-aodh-plugin. This allows collectd notifications + to to be sent to Aodh, where it they are used to create corresponding alarms. + The notifications sent are used to create new alarms or update existing + alarms in Aodh. These alarms are of type event, and updates on these alarms + occur if the notification from collectd will cause a change in the alarm + state.