Refactored notification engine types

Added more test coverage for email notifications.

Moved notification methods from notification_processor to new types directory.

types/interface.py now automatically records statsd information for configured
types.

Configuration for a type is now optional.  Uncofigured types are not able to
send notifications.  Notifications that come across for unconfigured types
generate a warning.

Email notification type now explicity exits when unable to connect to an SMTP
server.

Change-Id: I213d815965761736eb3680b5e14206ba7bef7e90
This commit is contained in:
Joe Keen 2015-01-15 14:09:22 -07:00
parent 45a2411ebb
commit ebc1ed72d6
14 changed files with 1570 additions and 541 deletions

View File

@ -156,9 +156,7 @@ def main(argv=None):
notifications,
sent_notifications,
finished,
config['email'],
config['webhook'],
config['pagerduty']
config['notification_types']
).run),
)
processors.extend(notification_processors)

View File

@ -13,16 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import email.mime.text
import json
import logging
import monascastatsd
import requests
import smtplib
import sys
import time
from monasca_notification.processors.base import BaseProcessor
from monasca_notification.types import notifiers
log = logging.getLogger(__name__)
@ -30,207 +25,43 @@ log = logging.getLogger(__name__)
class NotificationProcessor(BaseProcessor):
def __init__(self, notification_queue,
sent_notification_queue, finished_queue,
email_config, webhook_config, pagerduty_config):
sent_notification_queue, finished_queue, config):
self.notification_queue = notification_queue
self.sent_notification_queue = sent_notification_queue
self.finished_queue = finished_queue
self.email_config = email_config
self.webhook_config = {'timeout': 5}
self.webhook_config.update(webhook_config)
self.pagerduty_config = {
'timeout': 5,
'url': 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'}
self.pagerduty_config.update(pagerduty_config)
self.smtp = None
self._smtp_connect()
# Types as key, method used to process that type as value
self.notification_types = {'email': self._send_email,
'webhook': self._post_webhook,
'pagerduty': self._post_pagerduty}
self.statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions)
def _create_msg(self, hostname, notification):
"""Create two kind of messages:
1. Notifications that include metrics with a hostname as a dimension. There may be more than one hostname.
We will only report the hostname if there is only one.
2. Notifications that do not include metrics and therefore no hostname. Example: API initiated changes.
* A third notification type which include metrics but do not include a hostname will
be treated as type #2.
"""
if len(hostname) == 1: # Type 1
msg = email.mime.text.MIMEText("On host \"%s\" %s\n\nAlarm \"%s\" transitioned to the %s state at %s UTC"
% (hostname[0],
notification.message.lower(),
notification.alarm_name,
notification.state,
time.asctime(time.gmtime(notification.alarm_timestamp))) +
"\nalarm_id: %s" % notification.alarm_id)
msg['Subject'] = "%s \"%s\" for Host: %s" % (notification.state, notification.alarm_name, hostname[0])
else: # Type 2
msg = email.mime.text.MIMEText("%s\n\nAlarm \"%s\" transitioned to the %s state at %s UTC\nAlarm_id: %s"
% (notification.message,
notification.alarm_name,
notification.state,
time.asctime(time.gmtime(notification.alarm_timestamp)),
notification.alarm_id))
msg['Subject'] = "%s \"%s\" " % (notification.state, notification.alarm_name)
msg['From'] = self.email_config['from_addr']
msg['To'] = notification.address
return msg
def _send_email(self, notification):
"""Send the notification via email
Returns the notification upon success, None upon failure
"""
# Get the "hostname" from the notification metrics if there is one
hostname = []
for metric in notification.metrics:
for dimension in metric['dimensions']:
if 'hostname' in dimension:
if not metric['dimensions']['%s' % dimension] in hostname[:]:
hostname.append(metric['dimensions']['%s' % dimension])
# Generate the message
msg = self._create_msg(hostname, notification)
# email the notification
try:
self.smtp.sendmail(self.email_config['from_addr'], notification.address, msg.as_string())
log.debug('Sent email to %s, notification %s' % (notification.address, notification.to_json()))
except smtplib.SMTPServerDisconnected:
log.debug('SMTP server disconnected. Will reconnect and retry message.')
self._smtp_connect()
try:
self.smtp.sendmail(self.email_config['from_addr'], notification.address, msg.as_string())
log.debug('Sent email to %s, notification %s' % (notification.address, notification.to_json()))
except smtplib.SMTPException as e:
log.error("Error sending Email Notification:%s\nError:%s" % (notification.to_json(), e))
except smtplib.SMTPException as e:
log.error("Error sending Email Notification:%s\nError:%s" % (notification.to_json(), e))
else:
return notification
def _smtp_connect(self):
"""Connect to the smtp server
"""
log.info('Connecting to Email Server %s' % self.email_config['server'])
smtp = smtplib.SMTP(
self.email_config['server'], self.email_config['port'], timeout=self.email_config['timeout'])
if self.email_config['user'] is not None:
smtp.login(self.email_config['user'], self.email_config['password'])
self.smtp = smtp
def _post_webhook(self, notification):
"""Send the notification via webhook
Posts on the given url
"""
log.info(
"Notifying alarm %(alarm_id)s to %(current)s with action %(action)s" %
({'alarm_id': notification.alarm_name,
'current': notification.state,
'action': notification.address}))
body = {'alarm_id': notification.alarm_id}
headers = {'content-type': 'application/json'}
url = notification.address
try:
# Posting on the given URL
result = requests.post(url=url,
data=body,
headers=headers,
timeout=self.webhook_config['timeout'])
if result.status_code in range(200, 300):
log.info("Notification successfully posted.")
return notification
else:
log.error("Received an HTTP code %s when trying to post on URL %s." % (result.status_code, url))
except:
log.error("Error trying to post on URL %s: %s." % (url, sys.exc_info()[0]))
def _post_pagerduty(self, notification):
"""Send pagerduty notification
"""
url = self.pagerduty_config['url']
headers = {"content-type": "application/json"}
body = {"service_key": notification.address,
"event_type": "trigger",
"description": notification.message,
"client": "Monasca",
"client_url": "",
"details": {"alarm_id": notification.alarm_id,
"alarm_name": notification.alarm_name,
"current": notification.state,
"message": notification.message}}
try:
result = requests.post(url=url,
data=json.dumps(body),
headers=headers,
timeout=self.pagerduty_config['timeout'])
valid_http_codes = [200, 201, 204]
if result.status_code in valid_http_codes:
return notification
log.error("Error with pagerduty request. key=<%s> response=%s"
% (notification.address, result.status_code))
except:
log.error("Exception on pagerduty request. key=<%s> exception=%s"
% (notification.address, sys.exc_info()[0]))
self.config = config
def run(self):
"""Send the notifications
For each notification in a message it is sent according to its type.
If all notifications fail the alarm partition/offset are added to the the finished queue
"""
counters = {'email': self.statsd.get_counter(name='sent_smtp_count'),
'webhook': self.statsd.get_counter(name='sent_webhook_count'),
'pagerduty': self.statsd.get_counter(name='sent_pagerduty_count')}
timers = {'email': self.statsd.get_timer(),
'webhook': self.statsd.get_timer(),
'pagerduty': self.statsd.get_timer()}
notifiers.init(self.statsd)
notifiers.config(self.config)
invalid_type_count = self.statsd.get_counter(name='invalid_type_count')
sent_failed_count = self.statsd.get_counter(name='sent_failed_count')
while True:
notifications = self.notification_queue.get()
sent_notifications = []
for notification in notifications:
if notification.type not in self.notification_types:
log.warn('Notification type %s is not a valid type' % notification.type)
invalid_type_count += 1
continue
else:
timer_name = notification.type + '_time'
with timers[notification.type].time(timer_name):
sent = self.notification_types[notification.type](notification)
sent, failed, invalid = notifiers.send_notifications(notifications)
if sent is None:
sent_failed_count += 1
else:
sent.notification_timestamp = time.time()
sent_notifications.append(sent)
counters[notification.type] += 1
if len(sent_notifications) == 0: # All notifications failed
self._add_to_queue(
self.finished_queue, 'finished', (notifications[0].src_partition, notifications[0].src_offset))
else:
self._add_to_queue(self.sent_notification_queue, 'sent_notification', sent_notifications)
if failed > 0:
sent_failed_count.increment(failed)
if invalid > 0:
invalid_type_count.increment(invalid)
if sent:
self._add_to_queue(self.sent_notification_queue,
'sent_notification',
sent)
else: # All notifications failed
self._add_to_queue(self.finished_queue,
'finished',
(notifications[0].src_partition, notifications[0].src_offset))

View File

View File

@ -0,0 +1,40 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class AbstractNotifier(object):
def __init__(self):
pass
@abc.abstractproperty
def type(self):
pass
@abc.abstractproperty
def statsd_name(self):
pass
@abc.abstractmethod
def config(self, config):
pass
@abc.abstractmethod
def send_notification(self, notification):
pass

View File

@ -0,0 +1,145 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import email.mime.text
import smtplib
import sys
import time
from abstract_notifier import AbstractNotifier
class EmailNotifier(AbstractNotifier):
def __init__(self, log):
self._log = log
self._smtp = None
def config(self, config):
self._config = config
self._smtp_connect()
@property
def type(self):
return "email"
@property
def statsd_name(self):
return "sent_smtp_count"
def send_notification(self, notification):
"""Send the notification via email
Returns the True upon success, False upon failure
"""
# Get the "hostname" from the notification metrics if there is one
hostname = []
for metric in notification.metrics:
for dimension in metric['dimensions']:
if 'hostname' in dimension:
if not metric['dimensions'][dimension] in hostname:
hostname.append(metric['dimensions'][dimension])
# Generate the message
msg = self._create_msg(hostname, notification)
try:
self._sendmail(notification, msg)
return True
except smtplib.SMTPServerDisconnected:
self._log.warn('SMTP server disconnected. '
'Will reconnect and retry message.')
self._smtp_connect()
except smtplib.SMTPException:
self._email_error(notification)
return False
try:
self._sendmail(notification, msg)
return True
except smtplib.SMTPException:
self._email_error(notification)
return False
def _sendmail(self, notification, msg):
self._smtp.sendmail(self._config['from_addr'], notification.address, msg.as_string())
self._log.debug("Sent email to {}, notification {}".format(notification.address, notification.to_json()))
def _email_error(self, notification):
self._log.exception("Error sending Email Notification")
self._log.error("Failed email: {}".format(notification.to_json()))
def _smtp_connect(self):
"""Connect to the smtp server
"""
self._log.info("Connecting to Email Server {}".format(self._config['server']))
try:
smtp = smtplib.SMTP(self._config['server'],
self._config['port'],
timeout=self._config['timeout'])
if self._config['user'] is not None:
smtp.login(self._config['user'], self.config['password'])
self._smtp = smtp
except Exception:
self._log.exception("Unable to connect to email server. Exiting.")
sys.exit(1)
def _create_msg(self, hostname, notification):
"""Create two kind of messages:
1. Notifications that include metrics with a hostname as a dimension. There may be more than one hostname.
We will only report the hostname if there is only one.
2. Notifications that do not include metrics and therefore no hostname. Example: API initiated changes.
* A third notification type which include metrics but do not include a hostname will
be treated as type #2.
"""
timestamp = time.asctime(time.gmtime(notification.alarm_timestamp))
if len(hostname) == 1: # Type 1
text = '''On host "{}" {}
Alarm "{}" transitioned to the {} state at {} UTC
alarm_id: {}'''.format(hostname[0],
notification.message.lower(),
notification.alarm_name,
notification.state,
timestamp,
notification.alarm_id)
msg = email.mime.text.MIMEText(text)
msg['Subject'] = '{} "{}" for Host: {}'.format(notification.state,
notification.alarm_name,
hostname[0])
else: # Type 2
text = '''{}
Alarm "{}" transitioned to the {} state at {} UTC
Alarm_id: {}'''.format(notification.message,
notification.alarm_name,
notification.state,
timestamp,
notification.alarm_id)
msg = email.mime.text.MIMEText(text)
msg['Subject'] = '{} "{}" '.format(notification.state, notification.alarm_name)
msg['From'] = self._config['from_addr']
msg['To'] = notification.address
return msg

View File

@ -0,0 +1,96 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from monasca_notification.types import email_notifier
from monasca_notification.types import pagerduty_notifier
from monasca_notification.types import webhook_notifier
log = logging.getLogger(__name__)
possible_notifiers = []
configured_notifiers = {}
statsd_counter = {}
statsd = None
statsd_timer = None
def init(statsd_obj):
global statsd, statsd_timer
statsd = statsd_obj
statsd_timer = statsd.get_timer()
possible_notifiers.append(email_notifier.EmailNotifier(log))
possible_notifiers.append(webhook_notifier.WebhookNotifier(log))
possible_notifiers.append(pagerduty_notifier.PagerdutyNotifier(log))
def enabled_notifications():
results = []
for key in configured_notifiers:
results.append(key)
return results
def config(config):
for notifier in possible_notifiers:
ntype = notifier.type
if ntype in config:
try:
notifier.config(config[ntype])
configured_notifiers[ntype] = notifier
statsd_counter[ntype] = statsd.get_counter(notifier.statsd_name)
log.info("{} notification ready".format(ntype))
except Exception:
log.exception("config exception for {}".format(ntype))
else:
log.warn("No config data for type: {}".format(ntype))
def send_notifications(notifications):
sent = []
failed_count = 0
invalid_count = 0
for notification in notifications:
ntype = notification.type
if ntype not in configured_notifiers:
log.warn("attempting to send unconfigured notification: {}".format(ntype))
invalid_count += 1
continue
with statsd_timer.time(ntype + '_time'):
result = send_single_notification(notification)
if result:
notification.notification_timestamp = time.time()
sent.append(notification)
statsd_counter[ntype].increment(1)
else:
failed_count += 1
return (sent, failed_count, invalid_count)
def send_single_notification(notification):
ntype = notification.type
try:
return configured_notifiers[ntype].send_notification(notification)
except Exception:
log.exception("send_notification exception for {}".format(ntype))
return False

View File

@ -0,0 +1,73 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import requests
from abstract_notifier import AbstractNotifier
VALID_HTTP_CODES = [200, 201, 204]
class PagerdutyNotifier(AbstractNotifier):
def __init__(self, log):
self._log = log
def config(self, config):
self._config = {
'timeout': 5,
'url': 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'}
self._config.update(config)
@property
def type(self):
return "pagerduty"
@property
def statsd_name(self):
return 'sent_pagerduty_count'
def send_notification(self, notification):
"""Send pagerduty notification
"""
url = self._config['url']
headers = {"content-type": "application/json"}
body = {"service_key": notification.address,
"event_type": "trigger",
"description": notification.message,
"client": "Monasca",
"client_url": "",
"details": {"alarm_id": notification.alarm_id,
"alarm_name": notification.alarm_name,
"current": notification.state,
"message": notification.message}}
try:
result = requests.post(url=url,
data=json.dumps(body),
headers=headers,
timeout=self._config['timeout'])
if result.status_code in VALID_HTTP_CODES:
return True
self._log.error("Error with pagerduty request. key=<{}> response={}"
.format(notification.address, result.status_code))
return False
except Exception:
self._log.exception("Exception on pagerduty request. key=<{}>"
.format(notification.address))
return False

View File

@ -0,0 +1,68 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
from abstract_notifier import AbstractNotifier
class WebhookNotifier(AbstractNotifier):
def __init__(self, log):
self._log = log
def config(self, config_dict):
self._config = {'timeout': 5}
self._config.update(config_dict)
@property
def type(self):
return "webhook"
@property
def statsd_name(self):
return 'sent_webhook_count'
def send_notification(self, notification):
"""Send the notification via webhook
Posts on the given url
"""
self._log.info("Notifying alarm {} to {} with action {}"
.format(notification.alarm_name,
notification.state,
notification.address))
body = {'alarm_id': notification.alarm_id}
headers = {'content-type': 'application/json'}
url = notification.address
try:
# Posting on the given URL
result = requests.post(url=url,
data=body,
headers=headers,
timeout=self._config['timeout'])
if result.status_code in range(200, 300):
self._log.info("Notification successfully posted.")
return True
else:
self._log.error("Received an HTTP code {} when trying to post on URL {}."
.format(result.status_code, url))
return False
except Exception:
self._log.exception("Error trying to post on URL {}".format(url))
return False

View File

@ -13,20 +13,21 @@ mysql:
# A dictionary set according to the params defined in, http://dev.mysql.com/doc/refman/5.0/en/mysql-ssl-set.html
# ssl: {'ca': '/path/to/ca'}
email:
server: 192.168.10.4
port: 25
user:
password:
timeout: 60
from_addr: hpcs.mon@hp.com
notification_types:
email:
server: 192.168.10.4
port: 25
user:
password:
timeout: 60
from_addr: hpcs.mon@hp.com
webhook:
timeout: 5
webhook:
timeout: 5
pagerduty:
timeout: 5
url: "https://events.pagerduty.com/generic/2010-04-15/create_event.json"
pagerduty:
timeout: 5
url: "https://events.pagerduty.com/generic/2010-04-15/create_event.json"
processors:
alarm:

View File

@ -0,0 +1,302 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import smtplib
import socket
import time
import unittest
from monasca_notification.notification import Notification
from monasca_notification.types import email_notifier
def alarm(metrics):
return {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
class smtpStub(object):
def __init__(self, trap):
self.trap = trap
def sendmail(self, from_addr, to_addr, msg):
self.trap.append("%s %s %s" % (from_addr, to_addr, msg))
class smtpStubException(object):
def __init__(self, queue):
self.queue = queue
def sendmail(self, from_addr, to_addr, msg):
raise smtplib.SMTPServerDisconnected
class TestEmail(unittest.TestCase):
def setUp(self):
self.trap = []
self.email_config = {'server': 'my.smtp.server',
'port': 25,
'user': None,
'password': None,
'timeout': 60,
'from_addr': 'hpcs.mon@hp.com'}
def tearDown(self):
pass
def _smtpStub(self, *arg, **kwargs):
return smtpStub(self.trap)
def _smtbStubException(self, *arg, **kwargs):
return smtpStubException(self.trap)
@mock.patch('monasca_notification.types.email_notifier.smtplib')
def notify(self, smtp_stub, metric, mock_smtp):
mock_smtp.SMTP = smtp_stub
mock_log = mock.MagicMock()
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
alarm_dict = alarm(metric)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.trap.append(email.send_notification(notification))
def test_email_notification_single_host(self):
"""Email with single host
"""
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
self.notify(self._smtpStub, metrics)
email = self.trap.pop(0)
self.assertRegexpMatches(email, "From: hpcs.mon@hp.com")
self.assertRegexpMatches(email, "To: me@here.com")
self.assertRegexpMatches(email, "Content-Type: text/plain")
self.assertRegexpMatches(email, "Alarm .test Alarm.")
self.assertRegexpMatches(email, "On host .foo1.")
return_value = self.trap.pop(0)
self.assertTrue(return_value)
def test_email_notification_multiple_hosts(self):
"""Email with multiple hosts
"""
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
self.notify(self._smtpStub, metrics)
email = self.trap.pop(0)
self.assertRegexpMatches(email, "From: hpcs.mon@hp.com")
self.assertRegexpMatches(email, "To: me@here.com")
self.assertRegexpMatches(email, "Content-Type: text/plain")
self.assertRegexpMatches(email, "Alarm .test Alarm.")
self.assertNotRegexpMatches(email, "foo1")
self.assertNotRegexpMatches(email, "foo2")
return_value = self.trap.pop(0)
self.assertTrue(return_value)
@mock.patch('monasca_notification.types.email_notifier.sys')
@mock.patch('monasca_notification.types.email_notifier.smtplib')
def test_smtp_sendmail_failed_connection_twice(self, mock_smtp, mock_sys):
"""Email that fails on smtp_connect twice
"""
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
mock_log = mock.MagicMock()
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.debug = self.trap.append
mock_log.info = self.trap.append
mock_log.exception = self.trap.append
# mock_smtp.SMTP.return_value = mock_smtp
mock_smtp.SMTP.side_effect = [mock_smtp,
smtplib.SMTPServerDisconnected,
socket.error]
mock_smtp.sendmail.side_effect = [smtplib.SMTPServerDisconnected,
smtplib.SMTPServerDisconnected]
# There has to be a better way to preserve exception definitions when
# we're mocking access to a library
mock_smtp.SMTPServerDisconnected = smtplib.SMTPServerDisconnected
mock_smtp.SMTPException = smtplib.SMTPException
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.trap.append(email.send_notification(notification))
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
self.assertIn("Unable to connect to email server. Exiting.", self.trap)
self.assertTrue(mock_sys.exit.called)
@mock.patch('monasca_notification.types.email_notifier.sys')
@mock.patch('monasca_notification.types.email_notifier.smtplib')
def test_smtp_sendmail_failed_connection_once_then_email(self, mock_smtp, mock_sys):
"""Email that fails on smtp_connect once then email
"""
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
mock_log = mock.MagicMock()
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.debug = self.trap.append
mock_log.info = self.trap.append
mock_log.exception = self.trap.append
mock_smtp.SMTP.return_value = mock_smtp
mock_smtp.sendmail.side_effect = [smtplib.SMTPServerDisconnected,
smtplib.SMTPException]
# There has to be a better way to preserve exception definitions when
# we're mocking access to a library
mock_smtp.SMTPServerDisconnected = smtplib.SMTPServerDisconnected
mock_smtp.SMTPException = smtplib.SMTPException
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.trap.append(email.send_notification(notification))
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
self.assertIn("Error sending Email Notification", self.trap)
self.assertFalse(mock_sys.exit.called)
@mock.patch('monasca_notification.types.email_notifier.smtplib')
def test_smtp_sendmail_failed_connection_once(self, mock_smtp):
"""Email that fails on smtp_connect once
"""
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
mock_log = mock.MagicMock()
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.debug = self.trap.append
mock_log.info = self.trap.append
mock_log.exception = self.trap.append
mock_smtp.SMTP.return_value = mock_smtp
mock_smtp.sendmail.side_effect = [smtplib.SMTPServerDisconnected, None]
# There has to be a better way to preserve exception definitions when
# we're mocking access to a library
mock_smtp.SMTPServerDisconnected = smtplib.SMTPServerDisconnected
mock_smtp.SMTPException = smtplib.SMTPException
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.trap.append(email.send_notification(notification))
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
self.assertIn("Sent email to %s, notification %s"
% (notification.address, notification.to_json()), self.trap)
@mock.patch('monasca_notification.types.email_notifier.smtplib')
def test_smtp_sendmail_failed_exception(self, mock_smtp):
"""Email that fails on exception
"""
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
mock_log = mock.MagicMock()
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.debug = self.trap.append
mock_log.info = self.trap.append
mock_log.exception = self.trap.append
mock_smtp.SMTP.return_value = mock_smtp
mock_smtp.sendmail.side_effect = smtplib.SMTPException
# There has to be a better way to preserve exception definitions when
# we're mocking access to a library
mock_smtp.SMTPServerDisconnected = smtplib.SMTPServerDisconnected
mock_smtp.SMTPException = smtplib.SMTPException
email = email_notifier.EmailNotifier(mock_log)
email.config(self.email_config)
alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.trap.append(email.send_notification(notification))
self.assertNotIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
self.assertIn("Error sending Email Notification", self.trap)

View File

@ -15,10 +15,8 @@
"""Tests NotificationProcessor"""
import json
import mock
import multiprocessing
import requests
import time
import unittest
@ -42,7 +40,7 @@ class requestsResponse(object):
class TestStateTracker(unittest.TestCase):
def setUp(self):
self.http_func = self._http_post_200
self.trap = []
self.notification_queue = multiprocessing.Queue(10)
self.sent_notification_queue = multiprocessing.Queue(10)
self.finished_queue = multiprocessing.Queue(10)
@ -53,8 +51,6 @@ class TestStateTracker(unittest.TestCase):
'password': None,
'timeout': 60,
'from_addr': 'hpcs.mon@hp.com'}
self.webhook_config = {'timeout': 50}
self.pagerduty_config = {'timeout': 50, 'key': 'foobar'}
def tearDown(self):
self.assertTrue(self.log_queue.empty())
@ -66,31 +62,25 @@ class TestStateTracker(unittest.TestCase):
# Test helper functions
# ------------------------------------------------------------------------
@mock.patch('monasca_notification.processors.notification_processor.requests')
@mock.patch('monasca_notification.processors.notification_processor.smtplib')
@mock.patch('monasca_notification.processors.notification_processor.log')
def _start_processor(self, mock_log, mock_smtp, mock_requests):
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.processors.notification_processor.notifiers.log')
def _start_processor(self, mock_log, mock_smtp):
"""Start the processor with the proper mocks
"""
# Since the log runs in another thread I can mock it directly, instead change the methods to put to a queue
mock_log.warn = self.log_queue.put
mock_log.error = self.log_queue.put
mock_requests.post = self.http_func
mock_smtp.SMTP = self._smtpStub
self.mock_requests = mock_requests
self.mock_log = mock_log
self.mock_smtp = mock_smtp
config = {}
config["email"] = self.email_config
nprocessor = (notification_processor.
NotificationProcessor(self.notification_queue,
self.sent_notification_queue,
self.finished_queue,
self.email_config,
self.webhook_config,
self.pagerduty_config))
config))
self.processor = multiprocessing.Process(target=nprocessor.run)
self.processor.start()
@ -98,141 +88,27 @@ class TestStateTracker(unittest.TestCase):
def _smtpStub(self, *arg, **kwargs):
return smtpStub(self.log_queue)
def _http_post_200(self, url, data, headers, **kwargs):
self.log_queue.put(url)
self.log_queue.put(data)
self.log_queue.put(headers)
r = requestsResponse(200)
return r
def _http_post_201(self, url, data, headers, **kwargs):
self.log_queue.put(url)
self.log_queue.put(data)
self.log_queue.put(headers)
r = requestsResponse(201)
return r
def _http_post_202(self, url, data, headers, **kwargs):
r = requestsResponse(202)
return r
def _http_post_204(self, url, data, headers, **kwargs):
self.log_queue.put(url)
self.log_queue.put(data)
self.log_queue.put(headers)
r = requestsResponse(204)
return r
def _http_post_400(self, url, data, headers, **kwargs):
r = requestsResponse(400)
return r
def _http_post_403(self, url, data, headers, **kwargs):
r = requestsResponse(403)
return r
def _http_post_404(self, url, data, headers, **kwargs):
r = requestsResponse(404)
return r
def _http_post_500(self, url, data, headers, **kwargs):
r = requestsResponse(500)
return r
def _http_post_504(self, url, data, headers, **kwargs):
r = requestsResponse(504)
return r
def _http_post_exception(self, url, data, headers, **kwargs):
self.log_queue.put("timeout %s" % kwargs["timeout"])
raise requests.exceptions.Timeout
def assertSentNotification(self):
notification_msg = self.sent_notification_queue.get(timeout=3)
self.assertNotEqual(notification_msg, None)
def assertSentFinished(self):
finished_msg = self.finished_queue.get(timeout=3)
self.assertNotEqual(finished_msg, None)
def alarm(self, metrics):
return {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
def email_setup(self, metric):
alarm_dict = self.alarm(metric)
alarm_dict = {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metric}
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
def webhook_setup(self, http_func):
self.http_func = http_func
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
alarm_dict = self.alarm(metrics)
notification = Notification('webhook', 0, 1, 'email notification', 'me@here.com', alarm_dict)
self.notification_queue.put([notification])
self._start_processor()
def pagerduty_setup(self, http_stub):
self.http_func = http_stub
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
alarm_dict = self.alarm(metrics)
notification = Notification('pagerduty',
0,
1,
'pagerduty notification',
'ABCDEF',
alarm_dict)
self._start_processor()
self.notification_queue.put([notification])
def valid_pagerduty_message(self, url, data, headers):
self.assertEqual(
url, 'https://events.pagerduty.com/generic/2010-04-15/create_event.json')
headers = dict(headers)
self.assertEqual(headers['content-type'], 'application/json')
data = dict(json.loads(data))
self.assertEqual(data['service_key'], 'ABCDEF')
self.assertEqual(data['event_type'], 'trigger')
self.assertEqual(data['description'], 'I am alarming!')
self.assertEqual(data['client'], 'Monasca')
self.assertEqual(data['client_url'], '')
details = dict(data['details'])
self.assertEqual(details['alarm_id'], '0')
self.assertEqual(details['alarm_name'], 'test Alarm')
self.assertEqual(details['current'], 'ALARM')
self.assertEqual(details['message'], 'I am alarming!')
def pagerduty_http_error(self, log_msg, http_response):
self.assertRegexpMatches(log_msg, "Error with pagerduty request.")
self.assertRegexpMatches(log_msg, "key=<ABCDEF>")
self.assertRegexpMatches(log_msg, "response=%s" % http_response)
def _logQueueToTrap(self):
while True:
try:
self.trap.append(self.log_queue.get(timeout=1))
except Exception:
return
# ------------------------------------------------------------------------
# Unit tests
@ -247,12 +123,13 @@ class TestStateTracker(unittest.TestCase):
self.notification_queue.put([invalid_notification])
self._start_processor()
finished = self.finished_queue.get(timeout=2)
log_msg = self.log_queue.get(timeout=1)
self.processor.terminate()
self._logQueueToTrap()
self.assertTrue(finished == (0, 1))
self.assertTrue(log_msg == 'Notification type invalid is not a valid type')
self.assertIn('attempting to send unconfigured notification: invalid', self.trap)
def test_email_notification_single_host(self):
"""Email with single host
@ -264,191 +141,15 @@ class TestStateTracker(unittest.TestCase):
self.email_setup(metrics)
log_msg = self.log_queue.get(timeout=3)
self._logQueueToTrap()
self.assertRegexpMatches(log_msg, "From: hpcs.mon@hp.com")
self.assertRegexpMatches(log_msg, "To: me@here.com")
self.assertRegexpMatches(log_msg, "Content-Type: text/plain")
self.assertRegexpMatches(log_msg, "Alarm .test Alarm.")
self.assertRegexpMatches(log_msg, "On host .foo1.")
for msg in self.trap:
if "From: hpcs.mon@hp.com" in msg:
self.assertRegexpMatches(msg, "From: hpcs.mon@hp.com")
self.assertRegexpMatches(msg, "To: me@here.com")
self.assertRegexpMatches(msg, "Content-Type: text/plain")
self.assertRegexpMatches(msg, "Alarm .test Alarm.")
self.assertRegexpMatches(msg, "On host .foo1.")
self.assertSentNotification()
def test_email_notification_multiple_hosts(self):
"""Email with multiple hosts
"""
metrics = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metrics.append(metric_data)
metric_data = {'dimensions': {'hostname': 'foo2', 'service': 'bar2'}}
metrics.append(metric_data)
self.email_setup(metrics)
log_msg = self.log_queue.get(timeout=3)
self.assertRegexpMatches(log_msg, "From: hpcs.mon@hp.com")
self.assertRegexpMatches(log_msg, "To: me@here.com")
self.assertRegexpMatches(log_msg, "Content-Type: text/plain")
self.assertRegexpMatches(log_msg, "Alarm .test Alarm.")
self.assertNotRegexpMatches(log_msg, "foo1")
self.assertNotRegexpMatches(log_msg, "foo2")
self.assertSentNotification()
def test_webhook_good_http_response(self):
"""webhook 200
"""
self.webhook_setup(self._http_post_200)
url = self.log_queue.get(timeout=3)
data = self.log_queue.get(timeout=3)
headers = self.log_queue.get(timeout=3)
self.assertEqual(url, "me@here.com")
self.assertEqual(data, {'alarm_id': '0'})
self.assertEqual(headers, {'content-type': 'application/json'})
self.assertSentNotification()
def test_webhook_bad_http_response(self):
"""webhook bad response
"""
self.webhook_setup(self._http_post_404)
log_msg = self.log_queue.get(timeout=3)
self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm")
self.assertNotRegexpMatches(log_msg, "content-type.: .application/json")
self.assertRegexpMatches(log_msg, "HTTP code 404")
self.assertRegexpMatches(log_msg, "post on URL me@here.com")
self.assertSentFinished()
def test_webhook_timeout_exception_on_http_response(self):
"""webhook timeout exception
"""
self.webhook_setup(self._http_post_exception)
log_msg = self.log_queue.get(timeout=3)
self.assertEqual(log_msg, "timeout 50")
log_msg = self.log_queue.get(timeout=3)
self.assertNotRegexpMatches(log_msg, "alarm_id.: .test Alarm")
self.assertNotRegexpMatches(log_msg, "content-type.: .application/json")
self.assertRegexpMatches(log_msg, "Error trying to post on URL me@here")
self.assertRaises(requests.exceptions.Timeout)
self.assertSentFinished()
def test_pagerduty_200(self):
"""pagerduty 200
"""
self.pagerduty_setup(self._http_post_200)
url = self.log_queue.get(timeout=3)
data = self.log_queue.get(timeout=3)
headers = self.log_queue.get(timeout=3)
self.valid_pagerduty_message(url, data, headers)
self.assertSentNotification()
def test_pagerduty_201(self):
"""pagerduty 201
"""
self.pagerduty_setup(self._http_post_201)
url = self.log_queue.get(timeout=3)
data = self.log_queue.get(timeout=3)
headers = self.log_queue.get(timeout=3)
self.valid_pagerduty_message(url, data, headers)
self.assertSentNotification()
def test_pagerduty_204(self):
"""pagerduty 204
"""
self.pagerduty_setup(self._http_post_204)
url = self.log_queue.get(timeout=3)
data = self.log_queue.get(timeout=3)
headers = self.log_queue.get(timeout=3)
self.valid_pagerduty_message(url, data, headers)
self.assertSentNotification()
def test_pagerduty_202(self):
"""pagerduty 202
"""
self.pagerduty_setup(self._http_post_202)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "202")
self.assertSentFinished()
def test_pagerduty_400(self):
"""pagerduty 400
"""
self.pagerduty_setup(self._http_post_400)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "400")
self.assertSentFinished()
def test_pagerduty_403(self):
"""pagerduty 403
"""
self.pagerduty_setup(self._http_post_403)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "403")
self.assertSentFinished()
def test_pagerduty_500(self):
"""pagerduty 500
"""
self.pagerduty_setup(self._http_post_500)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "500")
self.assertSentFinished()
def test_pagerduty_504(self):
"""pagerduty 504
"""
self.pagerduty_setup(self._http_post_504)
log_msg = self.log_queue.get(timeout=3)
self.pagerduty_http_error(log_msg, "504")
self.assertSentFinished()
def test_pagerduty_exception(self):
"""pagerduty exception
"""
self.pagerduty_setup(self._http_post_exception)
log_msg = self.log_queue.get(timeout=3)
self.assertEqual(log_msg, "timeout 50")
log_msg = self.log_queue.get(timeout=3)
self.assertRegexpMatches(log_msg, "Exception on pagerduty request")
self.assertRegexpMatches(log_msg, "key=<ABCDEF>")
self.assertRegexpMatches(
log_msg, "exception=<class 'requests.exceptions.Timeout'>")
self.assertRaises(requests.exceptions.Timeout)
self.assertSentFinished()
notification_msg = self.sent_notification_queue.get(timeout=3)
self.assertNotEqual(notification_msg, None)

363
tests/test_notifiers.py Normal file
View File

@ -0,0 +1,363 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import mock
import time
import unittest
from monasca_notification.notification import Notification
from monasca_notification.types import notifiers
def alarm(metrics):
return {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
class NotifyStub(object):
def __init__(self, trap, config, send, failure):
self.config_exception = config
self.send_exception = send
self.failure = failure
self.trap = trap
@property
def type(self):
return "email"
@property
def statsd_name(self):
return "smtp_sent"
def config(self, config_dict):
if self.config_exception:
raise Exception
else:
pass
def send_notification(self, notification_obj):
if self.send_exception:
raise Exception
else:
if self.failure:
return False
else:
return True
class Statsd(object):
def __init__(self):
self.timer = StatsdTimer()
self.counter = StatsdCounter()
def get_timer(self):
return self.timer
def get_counter(self, key):
return self.counter
class StatsdTimer(object):
def __init__(self):
self.timer_calls = {}
@contextlib.contextmanager
def time(self, key):
self.start(key)
yield
self.stop(key)
def start(self, key):
key = key + "_start"
if key in self.timer_calls:
self.timer_calls[key] += 1
else:
self.timer_calls[key] = 1
def stop(self, key):
key = key + "_stop"
if key in self.timer_calls:
self.timer_calls[key] += 1
else:
self.timer_calls[key] = 1
class StatsdCounter(object):
def __init__(self):
self.counter = 0
def increment(self, val):
self.counter += val
class TestInterface(unittest.TestCase):
def setUp(self):
self.trap = []
self.statsd = Statsd()
self.email_config = {'server': 'my.smtp.server',
'port': 25,
'user': None,
'password': None,
'timeout': 60,
'from_addr': 'hpcs.mon@hp.com'}
def tearDown(self):
notifiers.possible_notifiers = []
notifiers.configured_notifiers = {}
self.trap = []
def _configExceptionStub(self, log):
return NotifyStub(self.trap, True, False, False)
def _sendExceptionStub(self, log):
return NotifyStub(self.trap, False, True, False)
def _sendFailureStub(self, log):
return NotifyStub(self.trap, False, False, True)
def _goodSendStub(self, log):
return NotifyStub(self.trap, False, False, False)
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_enabled_notifications(self, mock_log, mock_smtp):
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'xyz.com'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifications = notifiers.enabled_notifications()
self.assertEqual(len(notifications), 3)
self.assertEqual(sorted(notifications),
["email", "pagerduty", "webhook"])
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_config_missing_data(self, mock_log, mock_smtp):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.info = self.trap.append
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
self.assertIn("No config data for type: pagerduty", self.trap)
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_config_exception(self, mock_log, mock_smtp, mock_email):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.exception = self.trap.append
mock_email.EmailNotifier = self._configExceptionStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
self.assertIn("config exception for email", self.trap)
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_config_correct(self, mock_log, mock_smtp):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.info = self.trap.append
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
self.assertIn("email notification ready", self.trap)
self.assertIn("webhook notification ready", self.trap)
self.assertIn("pagerduty notification ready", self.trap)
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_send_notification_exception(self, mock_log, mock_smtp, mock_email):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.exception = self.trap.append
mock_email.EmailNotifier = self._sendExceptionStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifications = []
notifications.append(Notification('email', 0, 1,
'email notification',
'me@here.com', alarm({})))
notifiers.send_notifications(notifications)
self.assertIn("send_notification exception for email", self.trap)
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_send_notification_failure(self, mock_log, mock_smtp, mock_email):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.exception = self.trap.append
mock_email.EmailNotifier = self._sendFailureStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifications = []
notifications.append(Notification('email', 0, 1,
'email notification',
'me@here.com', alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications)
self.assertEqual(sent, [])
self.assertEqual(failed, 1)
self.assertEqual(invalid, 0)
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_send_notification_unconfigured(self, mock_log, mock_smtp, mock_email):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_log.info = self.trap.append
mock_email.EmailNotifier = self._sendExceptionStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
self.assertIn("No config data for type: pagerduty", self.trap)
notifications = []
notifications.append(Notification('pagerduty', 0, 1,
'pagerduty notification',
'me@here.com', alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications)
self.assertEqual(sent, [])
self.assertEqual(failed, 0)
self.assertEqual(invalid, 1)
self.assertIn("attempting to send unconfigured notification: pagerduty", self.trap)
@mock.patch('monasca_notification.types.notifiers.time')
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_send_notification_correct(self, mock_log, mock_smtp,
mock_email, mock_time):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_email.EmailNotifier = self._goodSendStub
mock_time.time.return_value = 42
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifications = []
notifications.append(Notification('email', 0, 1,
'email notification',
'me@here.com', alarm({})))
notifications.append(Notification('email', 0, 1,
'email notification',
'foo@here.com', alarm({})))
notifications.append(Notification('email', 0, 1,
'email notification',
'bar@here.com', alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications)
self.assertEqual(len(sent), 3)
self.assertEqual(failed, 0)
self.assertEqual(invalid, 0)
for n in sent:
self.assertEqual(n.notification_timestamp, 42)
@mock.patch('monasca_notification.types.notifiers.email_notifier')
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
@mock.patch('monasca_notification.types.notifiers.log')
def test_statsd(self, mock_log, mock_smtp, mock_email):
mock_log.warn = self.trap.append
mock_log.error = self.trap.append
mock_email.EmailNotifier = self._goodSendStub
config_dict = {'email': self.email_config,
'webhook': {'address': 'xyz.com'},
'pagerduty': {'address': 'abc'}}
notifiers.init(self.statsd)
notifiers.config(config_dict)
notifications = []
notifications.append(Notification('email', 0, 1,
'email notification',
'me@here.com', alarm({})))
notifications.append(Notification('email', 0, 1,
'email notification',
'foo@here.com', alarm({})))
notifications.append(Notification('email', 0, 1,
'email notification',
'bar@here.com', alarm({})))
notifiers.send_notifications(notifications)
self.assertEqual(self.statsd.timer.timer_calls['email_time_start'], 3)
self.assertEqual(self.statsd.timer.timer_calls['email_time_stop'], 3)
self.assertEqual(self.statsd.counter.counter, 3)

View File

@ -0,0 +1,271 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
import Queue
import requests
import time
import unittest
from monasca_notification.notification import Notification
from monasca_notification.types import pagerduty_notifier
def alarm(metrics):
return {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
class requestsResponse(object):
def __init__(self, status):
self.status_code = status
class TestWebhook(unittest.TestCase):
def setUp(self):
self.trap = Queue.Queue()
self.pagerduty_config = {'timeout': 50, 'key': 'foobar'}
def tearDown(self):
self.assertTrue(self.trap.empty())
def _http_post_200(self, url, data, headers, **kwargs):
self.trap.put(url)
self.trap.put(data)
self.trap.put(headers)
r = requestsResponse(200)
return r
def _http_post_201(self, url, data, headers, **kwargs):
self.trap.put(url)
self.trap.put(data)
self.trap.put(headers)
r = requestsResponse(201)
return r
def _http_post_202(self, url, data, headers, **kwargs):
r = requestsResponse(202)
return r
def _http_post_204(self, url, data, headers, **kwargs):
self.trap.put(url)
self.trap.put(data)
self.trap.put(headers)
r = requestsResponse(204)
return r
def _http_post_400(self, url, data, headers, **kwargs):
r = requestsResponse(400)
return r
def _http_post_403(self, url, data, headers, **kwargs):
r = requestsResponse(403)
return r
def _http_post_404(self, url, data, headers, **kwargs):
r = requestsResponse(404)
return r
def _http_post_500(self, url, data, headers, **kwargs):
r = requestsResponse(500)
return r
def _http_post_504(self, url, data, headers, **kwargs):
r = requestsResponse(504)
return r
def _http_post_exception(self, url, data, headers, **kwargs):
self.trap.put("timeout %s" % kwargs["timeout"])
raise requests.exceptions.Timeout
def valid_pagerduty_message(self, url, data, headers):
self.assertEqual(
url, 'https://events.pagerduty.com/generic/2010-04-15/create_event.json')
headers = dict(headers)
self.assertEqual(headers['content-type'], 'application/json')
data = dict(json.loads(data))
self.assertEqual(data['service_key'], 'ABCDEF')
self.assertEqual(data['event_type'], 'trigger')
self.assertEqual(data['description'], 'I am alarming!')
self.assertEqual(data['client'], 'Monasca')
self.assertEqual(data['client_url'], '')
details = dict(data['details'])
self.assertEqual(details['alarm_id'], '0')
self.assertEqual(details['alarm_name'], 'test Alarm')
self.assertEqual(details['current'], 'ALARM')
self.assertEqual(details['message'], 'I am alarming!')
def pagerduty_http_error(self, log_msg, http_response):
self.assertRegexpMatches(log_msg, "Error with pagerduty request.")
self.assertRegexpMatches(log_msg, "key=<ABCDEF>")
self.assertRegexpMatches(log_msg, "response=%s" % http_response)
@mock.patch('monasca_notification.types.pagerduty_notifier.requests')
def notify(self, http_func, mock_requests):
mock_log = mock.MagicMock()
mock_log.warn = self.trap.put
mock_log.error = self.trap.put
mock_log.exception = self.trap.put
mock_requests.post = http_func
pagerduty = pagerduty_notifier.PagerdutyNotifier(mock_log)
pagerduty.config(self.pagerduty_config)
metric = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metric.append(metric_data)
alarm_dict = alarm(metric)
notification = Notification('pagerduty',
0,
1,
'pagerduty notification',
'ABCDEF',
alarm_dict)
self.trap.put(pagerduty.send_notification(notification))
def test_pagerduty_200(self):
"""pagerduty 200
"""
self.notify(self._http_post_200)
url = self.trap.get(timeout=1)
data = self.trap.get(timeout=1)
headers = self.trap.get(timeout=1)
self.valid_pagerduty_message(url, data, headers)
return_value = self.trap.get()
self.assertTrue(return_value)
def test_pagerduty_201(self):
"""pagerduty 201
"""
self.notify(self._http_post_201)
url = self.trap.get(timeout=1)
data = self.trap.get(timeout=1)
headers = self.trap.get(timeout=1)
self.valid_pagerduty_message(url, data, headers)
return_value = self.trap.get()
self.assertTrue(return_value)
def test_pagerduty_204(self):
"""pagerduty 204
"""
self.notify(self._http_post_204)
url = self.trap.get(timeout=1)
data = self.trap.get(timeout=1)
headers = self.trap.get(timeout=1)
self.valid_pagerduty_message(url, data, headers)
return_value = self.trap.get()
self.assertTrue(return_value)
def test_pagerduty_202(self):
"""pagerduty 202
"""
self.notify(self._http_post_202)
results = self.trap.get(timeout=1)
self.pagerduty_http_error(results, "202")
return_value = self.trap.get()
self.assertFalse(return_value)
def test_pagerduty_400(self):
"""pagerduty 400
"""
self.notify(self._http_post_400)
results = self.trap.get(timeout=1)
self.pagerduty_http_error(results, "400")
return_value = self.trap.get()
self.assertFalse(return_value)
def test_pagerduty_403(self):
"""pagerduty 403
"""
self.notify(self._http_post_403)
results = self.trap.get(timeout=1)
self.pagerduty_http_error(results, "403")
return_value = self.trap.get()
self.assertFalse(return_value)
def test_pagerduty_500(self):
"""pagerduty 500
"""
self.notify(self._http_post_500)
results = self.trap.get(timeout=1)
self.pagerduty_http_error(results, "500")
return_value = self.trap.get()
self.assertFalse(return_value)
def test_pagerduty_504(self):
"""pagerduty 504
"""
self.notify(self._http_post_504)
results = self.trap.get(timeout=1)
self.pagerduty_http_error(results, "504")
return_value = self.trap.get()
self.assertFalse(return_value)
def test_pagerduty_exception(self):
"""pagerduty exception
"""
self.notify(self._http_post_exception)
results = self.trap.get(timeout=1)
self.assertEqual(results, "timeout 50")
results = self.trap.get(timeout=1)
self.assertRegexpMatches(results, "Exception on pagerduty request")
self.assertRegexpMatches(results, "key=<ABCDEF>")
self.assertRaises(requests.exceptions.Timeout)
return_value = self.trap.get()
self.assertFalse(return_value)

View File

@ -0,0 +1,140 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import Queue
import requests
import time
import unittest
from monasca_notification.notification import Notification
from monasca_notification.types import webhook_notifier
def alarm(metrics):
return {"tenantId": "0",
"alarmId": "0",
"alarmName": "test Alarm",
"oldState": "OK",
"newState": "ALARM",
"stateChangeReason": "I am alarming!",
"timestamp": time.time(),
"metrics": metrics}
class requestsResponse(object):
def __init__(self, status):
self.status_code = status
class TestWebhook(unittest.TestCase):
def setUp(self):
self.trap = Queue.Queue()
self.webhook_config = {'timeout': 50}
def tearDown(self):
self.assertTrue(self.trap.empty())
def _http_post_200(self, url, data, headers, **kwargs):
self.trap.put(url)
self.trap.put(data)
self.trap.put(headers)
r = requestsResponse(200)
return r
def _http_post_404(self, url, data, headers, **kwargs):
r = requestsResponse(404)
return r
def _http_post_exception(self, url, data, headers, **kwargs):
self.trap.put("timeout %s" % kwargs["timeout"])
raise requests.exceptions.Timeout
@mock.patch('monasca_notification.types.webhook_notifier.requests')
def notify(self, http_func, mock_requests):
mock_log = mock.MagicMock()
mock_log.warn = self.trap.put
mock_log.error = self.trap.put
mock_log.exception = self.trap.put
mock_requests.post = http_func
webhook = webhook_notifier.WebhookNotifier(mock_log)
webhook.config(self.webhook_config)
metric = []
metric_data = {'dimensions': {'hostname': 'foo1', 'service': 'bar1'}}
metric.append(metric_data)
alarm_dict = alarm(metric)
notification = Notification('webhook', 0, 1, 'webhook notification', 'me@here.com', alarm_dict)
self.trap.put(webhook.send_notification(notification))
def test_webhook_good_http_response(self):
"""webhook 200
"""
self.notify(self._http_post_200)
url = self.trap.get(timeout=1)
data = self.trap.get(timeout=1)
headers = self.trap.get(timeout=1)
self.assertEqual(url, "me@here.com")
self.assertEqual(data, {'alarm_id': '0'})
self.assertEqual(headers, {'content-type': 'application/json'})
return_value = self.trap.get()
self.assertTrue(return_value)
def test_webhook_bad_http_response(self):
"""webhook bad response
"""
self.notify(self._http_post_404)
error = self.trap.get()
self.assertNotRegexpMatches(error, "alarm_id.: .test Alarm")
self.assertNotRegexpMatches(error, "content-type.: .application/json")
self.assertRegexpMatches(error, "HTTP code 404")
self.assertRegexpMatches(error, "post on URL me@here.com")
return_value = self.trap.get()
self.assertFalse(return_value)
def test_webhook_timeout_exception_on_http_response(self):
"""webhook timeout exception
"""
self.notify(self._http_post_exception)
result = self.trap.get()
self.assertEqual(result, "timeout 50")
result = self.trap.get()
self.assertNotRegexpMatches(result, "alarm_id.: .test Alarm")
self.assertNotRegexpMatches(result, "content-type.: .application/json")
self.assertRegexpMatches(result, "Error trying to post on URL me@here")
self.assertRaises(requests.exceptions.Timeout)
return_value = self.trap.get()
self.assertFalse(return_value)