Merge "Added retry engine to notification system"
This commit is contained in:
commit
e5ede47918
@ -28,6 +28,7 @@ import time
|
|||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from notification_engine import NotificationEngine
|
from notification_engine import NotificationEngine
|
||||||
|
from retry_engine import RetryEngine
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
processors = [] # global list to facilitate clean signal handling
|
processors = [] # global list to facilitate clean signal handling
|
||||||
@ -67,6 +68,12 @@ def clean_exit(signum, frame=None):
|
|||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
def start_process(proccess_type, config):
|
||||||
|
log.info("start process: {}".format(proccess_type))
|
||||||
|
p = proccess_type(config)
|
||||||
|
p.run()
|
||||||
|
|
||||||
|
|
||||||
def main(argv=None):
|
def main(argv=None):
|
||||||
if argv is None:
|
if argv is None:
|
||||||
argv = sys.argv
|
argv = sys.argv
|
||||||
@ -84,8 +91,12 @@ def main(argv=None):
|
|||||||
# Setup logging
|
# Setup logging
|
||||||
logging.config.dictConfig(config['logging'])
|
logging.config.dictConfig(config['logging'])
|
||||||
|
|
||||||
notifier = multiprocessing.Process(target=NotificationEngine(config).run)
|
for proc in range(0, config['processors']['notification']['number']):
|
||||||
processors.append(notifier)
|
processors.append(multiprocessing.Process(
|
||||||
|
target=start_process, args=(NotificationEngine, config)))
|
||||||
|
|
||||||
|
processors.append(multiprocessing.Process(
|
||||||
|
target=start_process, args=(RetryEngine, config)))
|
||||||
|
|
||||||
# Start
|
# Start
|
||||||
try:
|
try:
|
||||||
|
@ -33,17 +33,21 @@ class Notification(object):
|
|||||||
'state',
|
'state',
|
||||||
'tenant_id',
|
'tenant_id',
|
||||||
'type',
|
'type',
|
||||||
'metrics'
|
'metrics',
|
||||||
|
'retry_count',
|
||||||
|
'raw_alarm'
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, ntype, src_partition, src_offset, name, address, alarm):
|
def __init__(self, ntype, src_partition, src_offset, name, address,
|
||||||
|
retry_count, alarm):
|
||||||
"""Setup the notification object
|
"""Setup the notification object
|
||||||
The src_partition and src_offset allow the notification
|
The src_partition and src_offset allow the notification
|
||||||
to be linked to the alarm that it came from.
|
to be linked to the alarm that it came from.
|
||||||
ntype - The notification type
|
ntype - The notification type
|
||||||
name - Name used in sending
|
name - Name used in sending
|
||||||
address - to send the notification to
|
address - to send the notification to
|
||||||
alarm_data - info that caused the notification
|
retry_count - number of times we've tried to send
|
||||||
|
alarm - info that caused the notification
|
||||||
notifications that come after this one to remain uncommitted.
|
notifications that come after this one to remain uncommitted.
|
||||||
"""
|
"""
|
||||||
self.address = address
|
self.address = address
|
||||||
@ -51,6 +55,9 @@ class Notification(object):
|
|||||||
self.src_partition = src_partition
|
self.src_partition = src_partition
|
||||||
self.src_offset = src_offset
|
self.src_offset = src_offset
|
||||||
self.type = ntype
|
self.type = ntype
|
||||||
|
self.retry_count = retry_count
|
||||||
|
|
||||||
|
self.raw_alarm = alarm
|
||||||
|
|
||||||
self.alarm_id = alarm['alarmId']
|
self.alarm_id = alarm['alarmId']
|
||||||
self.alarm_name = alarm['alarmName']
|
self.alarm_name = alarm['alarmName']
|
||||||
@ -77,8 +84,11 @@ class Notification(object):
|
|||||||
"""Return json representation
|
"""Return json representation
|
||||||
"""
|
"""
|
||||||
notification_fields = [
|
notification_fields = [
|
||||||
'address',
|
'type',
|
||||||
'name',
|
'name',
|
||||||
|
'address',
|
||||||
|
'retry_count',
|
||||||
|
'raw_alarm',
|
||||||
'alarm_id',
|
'alarm_id',
|
||||||
'alarm_name',
|
'alarm_name',
|
||||||
'alarm_timestamp',
|
'alarm_timestamp',
|
||||||
|
@ -26,13 +26,14 @@ class NotificationEngine(object):
|
|||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
self._topics = {}
|
self._topics = {}
|
||||||
self._topics['notification_topic'] = config['kafka']['notification_topic']
|
self._topics['notification_topic'] = config['kafka']['notification_topic']
|
||||||
|
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
|
||||||
|
|
||||||
self._statsd = monascastatsd.Client(name='monasca',
|
self._statsd = monascastatsd.Client(name='monasca',
|
||||||
dimensions=BaseProcessor.dimensions)
|
dimensions=BaseProcessor.dimensions)
|
||||||
|
|
||||||
self._consumer = KafkaConsumer(config['kafka']['url'],
|
self._consumer = KafkaConsumer(config['kafka']['url'],
|
||||||
config['zookeeper']['url'],
|
config['zookeeper']['url'],
|
||||||
config['zookeeper']['path'],
|
config['zookeeper']['notification_path'],
|
||||||
config['kafka']['group'],
|
config['kafka']['group'],
|
||||||
config['kafka']['alarm_topic'])
|
config['kafka']['alarm_topic'])
|
||||||
|
|
||||||
@ -62,6 +63,7 @@ class NotificationEngine(object):
|
|||||||
if notifications:
|
if notifications:
|
||||||
sent, failed = self._notifier.send(notifications)
|
sent, failed = self._notifier.send(notifications)
|
||||||
self._producer.publish(self._topics['notification_topic'], sent)
|
self._producer.publish(self._topics['notification_topic'], sent)
|
||||||
|
self._producer.publish(self._topics['retry_topic'], failed)
|
||||||
|
|
||||||
self._consumer.commit([partition])
|
self._consumer.commit([partition])
|
||||||
|
|
||||||
|
@ -125,6 +125,7 @@ class AlarmProcessor(BaseProcessor):
|
|||||||
offset,
|
offset,
|
||||||
row[0],
|
row[0],
|
||||||
row[2],
|
row[2],
|
||||||
|
0,
|
||||||
alarm) for row in cur]
|
alarm) for row in cur]
|
||||||
|
|
||||||
if len(notifications) == 0:
|
if len(notifications) == 0:
|
||||||
|
@ -17,6 +17,7 @@ import kafka.client
|
|||||||
import kafka.producer
|
import kafka.producer
|
||||||
import logging
|
import logging
|
||||||
import monascastatsd
|
import monascastatsd
|
||||||
|
import time
|
||||||
|
|
||||||
from monasca_notification.processors.base import BaseProcessor
|
from monasca_notification.processors.base import BaseProcessor
|
||||||
|
|
||||||
@ -34,10 +35,10 @@ class KafkaProducer(BaseProcessor):
|
|||||||
self._statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions)
|
self._statsd = monascastatsd.Client(name='monasca', dimensions=BaseProcessor.dimensions)
|
||||||
|
|
||||||
self._kafka = kafka.client.KafkaClient(url)
|
self._kafka = kafka.client.KafkaClient(url)
|
||||||
self._producer = kafka.producer.SimpleProducer(
|
self._producer = kafka.producer.KeyedProducer(
|
||||||
self._kafka,
|
self._kafka,
|
||||||
async=False,
|
async=False,
|
||||||
req_acks=kafka.producer.SimpleProducer.ACK_AFTER_LOCAL_WRITE,
|
req_acks=kafka.producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
|
||||||
ack_timeout=2000)
|
ack_timeout=2000)
|
||||||
|
|
||||||
def publish(self, topic, messages):
|
def publish(self, topic, messages):
|
||||||
@ -46,7 +47,13 @@ class KafkaProducer(BaseProcessor):
|
|||||||
published_to_kafka = self._statsd.get_counter(name='published_to_kafka')
|
published_to_kafka = self._statsd.get_counter(name='published_to_kafka')
|
||||||
|
|
||||||
for message in messages:
|
for message in messages:
|
||||||
responses = self._producer.send_messages(topic, message.to_json())
|
key = time.time() * 1000
|
||||||
|
try:
|
||||||
|
responses = self._producer.send(topic, key, message.to_json())
|
||||||
|
except Exception:
|
||||||
|
log.exception("error publishing message to kafka")
|
||||||
|
continue
|
||||||
|
|
||||||
published_to_kafka += 1
|
published_to_kafka += 1
|
||||||
|
|
||||||
log.debug('Published to topic {}, message {}'.format(topic, message.to_json()))
|
log.debug('Published to topic {}, message {}'.format(topic, message.to_json()))
|
||||||
|
@ -40,10 +40,10 @@ class NotificationProcessor(BaseProcessor):
|
|||||||
|
|
||||||
sent, failed, invalid = notifiers.send_notifications(notifications)
|
sent, failed, invalid = notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
if failed > 0:
|
if failed:
|
||||||
sent_failed_count.increment(failed)
|
sent_failed_count.increment(len(failed))
|
||||||
|
|
||||||
if invalid > 0:
|
if invalid:
|
||||||
invalid_type_count.increment(invalid)
|
invalid_type_count.increment(len(invalid))
|
||||||
|
|
||||||
return sent, failed
|
return sent, failed
|
||||||
|
98
monasca_notification/retry_engine.py
Normal file
98
monasca_notification/retry_engine.py
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import monascastatsd
|
||||||
|
import time
|
||||||
|
|
||||||
|
from notification import Notification
|
||||||
|
from processors.base import BaseProcessor
|
||||||
|
from processors.kafka_consumer import KafkaConsumer
|
||||||
|
from processors.kafka_producer import KafkaProducer
|
||||||
|
from processors.notification_processor import NotificationProcessor
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RetryEngine(object):
|
||||||
|
def __init__(self, config):
|
||||||
|
self._retry_interval = config['retry']['interval']
|
||||||
|
self._retry_max = config['retry']['max_attempts']
|
||||||
|
|
||||||
|
self._topics = {}
|
||||||
|
self._topics['notification_topic'] = config['kafka']['notification_topic']
|
||||||
|
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
|
||||||
|
|
||||||
|
self._statsd = monascastatsd.Client(name='monasca',
|
||||||
|
dimensions=BaseProcessor.dimensions)
|
||||||
|
|
||||||
|
self._consumer = KafkaConsumer(config['kafka']['url'],
|
||||||
|
config['zookeeper']['url'],
|
||||||
|
config['zookeeper']['notification_retry_path'],
|
||||||
|
config['kafka']['group'],
|
||||||
|
config['kafka']['notification_retry_topic'])
|
||||||
|
|
||||||
|
self._producer = KafkaProducer(config['kafka']['url'])
|
||||||
|
|
||||||
|
self._notifier = NotificationProcessor(config['notification_types'])
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
for raw_notification in self._consumer:
|
||||||
|
partition = raw_notification[0]
|
||||||
|
offset = raw_notification[1].offset
|
||||||
|
message = raw_notification[1].message.value
|
||||||
|
|
||||||
|
notification_data = json.loads(message)
|
||||||
|
|
||||||
|
ntype = notification_data['type']
|
||||||
|
name = notification_data['name']
|
||||||
|
addr = notification_data['address']
|
||||||
|
|
||||||
|
notification = Notification(ntype,
|
||||||
|
partition,
|
||||||
|
offset,
|
||||||
|
name,
|
||||||
|
addr,
|
||||||
|
notification_data['retry_count'],
|
||||||
|
notification_data['raw_alarm'])
|
||||||
|
|
||||||
|
wait_duration = self._retry_interval - (
|
||||||
|
time.time() - notification_data['notification_timestamp'])
|
||||||
|
|
||||||
|
if wait_duration > 0:
|
||||||
|
time.sleep(wait_duration)
|
||||||
|
|
||||||
|
sent, failed = self._notifier.send([notification])
|
||||||
|
|
||||||
|
if sent:
|
||||||
|
self._producer.publish(self._topics['notification_topic'], sent)
|
||||||
|
|
||||||
|
if failed:
|
||||||
|
notification.retry_count += 1
|
||||||
|
notification.notification_timestamp = time.time()
|
||||||
|
if notification.retry_count < self._retry_max:
|
||||||
|
log.error("retry failed for {} with name {} "
|
||||||
|
"at {}. "
|
||||||
|
"Saving for later retry.".format(ntype, name, addr))
|
||||||
|
self._producer.publish(self._topics['retry_topic'],
|
||||||
|
[notification])
|
||||||
|
else:
|
||||||
|
log.error("retry failed for {} with name {} "
|
||||||
|
"at {} after {} retries. "
|
||||||
|
"Giving up on retry."
|
||||||
|
.format(ntype, name, addr, self._retry_max))
|
||||||
|
|
||||||
|
self._consumer.commit([partition])
|
@ -15,7 +15,6 @@
|
|||||||
|
|
||||||
import email.mime.text
|
import email.mime.text
|
||||||
import smtplib
|
import smtplib
|
||||||
import sys
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from abstract_notifier import AbstractNotifier
|
from abstract_notifier import AbstractNotifier
|
||||||
@ -96,8 +95,7 @@ class EmailNotifier(AbstractNotifier):
|
|||||||
|
|
||||||
self._smtp = smtp
|
self._smtp = smtp
|
||||||
except Exception:
|
except Exception:
|
||||||
self._log.exception("Unable to connect to email server. Exiting.")
|
self._log.exception("Unable to connect to email server.")
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def _create_msg(self, hostname, notification):
|
def _create_msg(self, hostname, notification):
|
||||||
"""Create two kind of messages:
|
"""Create two kind of messages:
|
||||||
|
@ -64,27 +64,28 @@ def config(config):
|
|||||||
|
|
||||||
def send_notifications(notifications):
|
def send_notifications(notifications):
|
||||||
sent = []
|
sent = []
|
||||||
failed_count = 0
|
failed = []
|
||||||
invalid_count = 0
|
invalid = []
|
||||||
|
|
||||||
for notification in notifications:
|
for notification in notifications:
|
||||||
ntype = notification.type
|
ntype = notification.type
|
||||||
if ntype not in configured_notifiers:
|
if ntype not in configured_notifiers:
|
||||||
log.warn("attempting to send unconfigured notification: {}".format(ntype))
|
log.warn("attempting to send unconfigured notification: {}".format(ntype))
|
||||||
invalid_count += 1
|
invalid.append(notification)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
notification.notification_timestamp = time.time()
|
||||||
|
|
||||||
with statsd_timer.time(ntype + '_time'):
|
with statsd_timer.time(ntype + '_time'):
|
||||||
result = send_single_notification(notification)
|
result = send_single_notification(notification)
|
||||||
|
|
||||||
if result:
|
if result:
|
||||||
notification.notification_timestamp = time.time()
|
|
||||||
sent.append(notification)
|
sent.append(notification)
|
||||||
statsd_counter[ntype].increment(1)
|
statsd_counter[ntype].increment(1)
|
||||||
else:
|
else:
|
||||||
failed_count += 1
|
failed.append(notification)
|
||||||
|
|
||||||
return (sent, failed_count, invalid_count)
|
return (sent, failed, invalid)
|
||||||
|
|
||||||
|
|
||||||
def send_single_notification(notification):
|
def send_single_notification(notification):
|
||||||
|
@ -3,6 +3,7 @@ kafka:
|
|||||||
group: monasca-notification
|
group: monasca-notification
|
||||||
alarm_topic: alarm-state-transitions
|
alarm_topic: alarm-state-transitions
|
||||||
notification_topic: alarm-notifications
|
notification_topic: alarm-notifications
|
||||||
|
notification_retry_topic: retry-notifications
|
||||||
max_offset_lag: 600 # In seconds, undefined for none
|
max_offset_lag: 600 # In seconds, undefined for none
|
||||||
|
|
||||||
mysql:
|
mysql:
|
||||||
@ -36,6 +37,10 @@ processors:
|
|||||||
notification:
|
notification:
|
||||||
number: 4
|
number: 4
|
||||||
|
|
||||||
|
retry:
|
||||||
|
interval: 30
|
||||||
|
max_attempts: 5
|
||||||
|
|
||||||
queues:
|
queues:
|
||||||
alarms_size: 256
|
alarms_size: 256
|
||||||
finished_size: 256
|
finished_size: 256
|
||||||
@ -44,6 +49,8 @@ queues:
|
|||||||
|
|
||||||
zookeeper:
|
zookeeper:
|
||||||
url: 192.168.10.4:2181 # or comma seperated list of multiple hosts
|
url: 192.168.10.4:2181 # or comma seperated list of multiple hosts
|
||||||
|
notification_path: /notification/alarms
|
||||||
|
notification_retry_path: /notification/retry
|
||||||
|
|
||||||
logging: # Used in logging.dictConfig
|
logging: # Used in logging.dictConfig
|
||||||
version: 1
|
version: 1
|
||||||
|
@ -116,7 +116,7 @@ class TestAlarmProcessor(unittest.TestCase):
|
|||||||
sql_response = [['test notification', 'EMAIL', 'me@here.com']]
|
sql_response = [['test notification', 'EMAIL', 'me@here.com']]
|
||||||
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
|
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
|
||||||
|
|
||||||
test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', alarm_dict)
|
test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self.assertEqual(notifications, [test_notification])
|
self.assertEqual(notifications, [test_notification])
|
||||||
self.assertEqual(partition, 0)
|
self.assertEqual(partition, 0)
|
||||||
@ -132,8 +132,8 @@ class TestAlarmProcessor(unittest.TestCase):
|
|||||||
sql_response = [['test notification', 'EMAIL', 'me@here.com'], ['test notification2', 'EMAIL', 'me@here.com']]
|
sql_response = [['test notification', 'EMAIL', 'me@here.com'], ['test notification2', 'EMAIL', 'me@here.com']]
|
||||||
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
|
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
|
||||||
|
|
||||||
test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', alarm_dict)
|
test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', 0, alarm_dict)
|
||||||
test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', alarm_dict)
|
test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self.assertEqual(notifications, [test_notification, test_notification2])
|
self.assertEqual(notifications, [test_notification, test_notification2])
|
||||||
self.assertEqual(partition, 0)
|
self.assertEqual(partition, 0)
|
||||||
|
@ -84,7 +84,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metric)
|
alarm_dict = alarm(metric)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
@ -133,9 +133,8 @@ class TestEmail(unittest.TestCase):
|
|||||||
return_value = self.trap.pop(0)
|
return_value = self.trap.pop(0)
|
||||||
self.assertTrue(return_value)
|
self.assertTrue(return_value)
|
||||||
|
|
||||||
@mock.patch('monasca_notification.types.email_notifier.sys')
|
|
||||||
@mock.patch('monasca_notification.types.email_notifier.smtplib')
|
@mock.patch('monasca_notification.types.email_notifier.smtplib')
|
||||||
def test_smtp_sendmail_failed_connection_twice(self, mock_smtp, mock_sys):
|
def test_smtp_sendmail_failed_connection_twice(self, mock_smtp):
|
||||||
"""Email that fails on smtp_connect twice
|
"""Email that fails on smtp_connect twice
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -171,17 +170,15 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
|
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
|
||||||
self.assertIn("Unable to connect to email server. Exiting.", self.trap)
|
self.assertIn("Unable to connect to email server.", self.trap)
|
||||||
self.assertTrue(mock_sys.exit.called)
|
|
||||||
|
|
||||||
@mock.patch('monasca_notification.types.email_notifier.sys')
|
|
||||||
@mock.patch('monasca_notification.types.email_notifier.smtplib')
|
@mock.patch('monasca_notification.types.email_notifier.smtplib')
|
||||||
def test_smtp_sendmail_failed_connection_once_then_email(self, mock_smtp, mock_sys):
|
def test_smtp_sendmail_failed_connection_once_then_email(self, mock_smtp):
|
||||||
"""Email that fails on smtp_connect once then email
|
"""Email that fails on smtp_connect once then email
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -214,13 +211,13 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
|
self.assertIn("SMTP server disconnected. Will reconnect and retry message.", self.trap)
|
||||||
self.assertIn("Error sending Email Notification", self.trap)
|
self.assertIn("Error sending Email Notification", self.trap)
|
||||||
self.assertFalse(mock_sys.exit.called)
|
self.assertNotIn("Unable to connect to email server.", self.trap)
|
||||||
|
|
||||||
@mock.patch('monasca_notification.types.email_notifier.smtplib')
|
@mock.patch('monasca_notification.types.email_notifier.smtplib')
|
||||||
def test_smtp_sendmail_failed_connection_once(self, mock_smtp):
|
def test_smtp_sendmail_failed_connection_once(self, mock_smtp):
|
||||||
@ -254,7 +251,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
@ -294,7 +291,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
|
@ -29,9 +29,12 @@ def test_json():
|
|||||||
'newState': 'newState',
|
'newState': 'newState',
|
||||||
'tenantId': 'tenantId',
|
'tenantId': 'tenantId',
|
||||||
'metrics': 'cpu_util'}
|
'metrics': 'cpu_util'}
|
||||||
test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
|
test_notification = notification.Notification('ntype', 'src_partition',
|
||||||
|
'src_offset', 'name',
|
||||||
|
'address', 0, alarm)
|
||||||
|
|
||||||
expected_dict = {u'name': u'name',
|
expected_dict = {u'name': u'name',
|
||||||
|
u'type': u'ntype',
|
||||||
u'notification_timestamp': None,
|
u'notification_timestamp': None,
|
||||||
u'tenant_id': u'tenantId',
|
u'tenant_id': u'tenantId',
|
||||||
u'alarm_name': u'alarmName',
|
u'alarm_name': u'alarmName',
|
||||||
@ -39,7 +42,17 @@ def test_json():
|
|||||||
u'state': u'newState',
|
u'state': u'newState',
|
||||||
u'alarm_timestamp': u'timestamp',
|
u'alarm_timestamp': u'timestamp',
|
||||||
u'address': u'address',
|
u'address': u'address',
|
||||||
u'message': u'stateChangeReason'}
|
u'message': u'stateChangeReason',
|
||||||
|
u'retry_count': 0,
|
||||||
|
u'raw_alarm': {
|
||||||
|
u'alarmId': u'alarmId',
|
||||||
|
u'alarmName': u'alarmName',
|
||||||
|
u'timestamp': u'timestamp',
|
||||||
|
u'stateChangeReason': u'stateChangeReason',
|
||||||
|
u'newState': u'newState',
|
||||||
|
u'tenantId': u'tenantId',
|
||||||
|
u'metrics': u'cpu_util'}}
|
||||||
|
|
||||||
# Compare as dicts so ordering is not an issue
|
# Compare as dicts so ordering is not an issue
|
||||||
assert json.loads(test_notification.to_json()) == expected_dict
|
assert json.loads(test_notification.to_json()) == expected_dict
|
||||||
|
|
||||||
@ -52,8 +65,12 @@ def test_equal():
|
|||||||
'newState': 'newState',
|
'newState': 'newState',
|
||||||
'tenantId': 'tenantId',
|
'tenantId': 'tenantId',
|
||||||
'metrics': 'cpu_util'}
|
'metrics': 'cpu_util'}
|
||||||
test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
|
test_notification = notification.Notification('ntype', 'src_partition',
|
||||||
test_notification2 = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
|
'src_offset', 'name',
|
||||||
|
'address', 0, alarm)
|
||||||
|
test_notification2 = notification.Notification('ntype', 'src_partition',
|
||||||
|
'src_offset', 'name',
|
||||||
|
'address', 0, alarm)
|
||||||
|
|
||||||
assert(test_notification == test_notification2)
|
assert(test_notification == test_notification2)
|
||||||
|
|
||||||
@ -66,8 +83,12 @@ def test_unequal():
|
|||||||
'newState': 'newState',
|
'newState': 'newState',
|
||||||
'tenantId': 'tenantId',
|
'tenantId': 'tenantId',
|
||||||
'metrics': 'cpu_util'}
|
'metrics': 'cpu_util'}
|
||||||
test_notification = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
|
test_notification = notification.Notification('ntype', 'src_partition',
|
||||||
test_notification2 = notification.Notification('ntype', 'src_partition', 'src_offset', 'name', 'address', alarm)
|
'src_offset', 'name',
|
||||||
|
'address', 0, alarm)
|
||||||
|
test_notification2 = notification.Notification('ntype', 'src_partition',
|
||||||
|
'src_offset', 'name',
|
||||||
|
'address', 0, alarm)
|
||||||
test_notification2.alarm_id = None
|
test_notification2.alarm_id = None
|
||||||
|
|
||||||
assert(test_notification != test_notification2)
|
assert(test_notification != test_notification2)
|
||||||
|
@ -85,7 +85,7 @@ class TestNotificationProcessor(unittest.TestCase):
|
|||||||
"timestamp": time.time(),
|
"timestamp": time.time(),
|
||||||
"metrics": metric}
|
"metrics": metric}
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self._start_processor([notification])
|
self._start_processor([notification])
|
||||||
|
|
||||||
@ -98,7 +98,7 @@ class TestNotificationProcessor(unittest.TestCase):
|
|||||||
"""
|
"""
|
||||||
alarm_dict = {"tenantId": "0", "alarmId": "0", "alarmName": "test Alarm", "oldState": "OK", "newState": "ALARM",
|
alarm_dict = {"tenantId": "0", "alarmId": "0", "alarmName": "test Alarm", "oldState": "OK", "newState": "ALARM",
|
||||||
"stateChangeReason": "I am alarming!", "timestamp": time.time(), "metrics": "cpu_util"}
|
"stateChangeReason": "I am alarming!", "timestamp": time.time(), "metrics": "cpu_util"}
|
||||||
invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', alarm_dict)
|
invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self._start_processor([invalid_notification])
|
self._start_processor([invalid_notification])
|
||||||
|
|
||||||
|
@ -224,7 +224,7 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'me@here.com', alarm({})))
|
'me@here.com', 0, alarm({})))
|
||||||
|
|
||||||
notifiers.send_notifications(notifications)
|
notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
@ -250,13 +250,13 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'me@here.com', alarm({})))
|
'me@here.com', 0, alarm({})))
|
||||||
|
|
||||||
sent, failed, invalid = notifiers.send_notifications(notifications)
|
sent, failed, invalid = notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
self.assertEqual(sent, [])
|
self.assertEqual(sent, [])
|
||||||
self.assertEqual(failed, 1)
|
self.assertEqual(len(failed), 1)
|
||||||
self.assertEqual(invalid, 0)
|
self.assertEqual(invalid, [])
|
||||||
|
|
||||||
@mock.patch('monasca_notification.types.notifiers.email_notifier')
|
@mock.patch('monasca_notification.types.notifiers.email_notifier')
|
||||||
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
|
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
|
||||||
@ -279,13 +279,13 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('pagerduty', 0, 1,
|
notifications.append(Notification('pagerduty', 0, 1,
|
||||||
'pagerduty notification',
|
'pagerduty notification',
|
||||||
'me@here.com', alarm({})))
|
'me@here.com', 0, alarm({})))
|
||||||
|
|
||||||
sent, failed, invalid = notifiers.send_notifications(notifications)
|
sent, failed, invalid = notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
self.assertEqual(sent, [])
|
self.assertEqual(sent, [])
|
||||||
self.assertEqual(failed, 0)
|
self.assertEqual(failed, [])
|
||||||
self.assertEqual(invalid, 1)
|
self.assertEqual(len(invalid), 1)
|
||||||
|
|
||||||
self.assertIn("attempting to send unconfigured notification: pagerduty", self.trap)
|
self.assertIn("attempting to send unconfigured notification: pagerduty", self.trap)
|
||||||
|
|
||||||
@ -312,19 +312,19 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'me@here.com', alarm({})))
|
'me@here.com', 0, alarm({})))
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'foo@here.com', alarm({})))
|
'foo@here.com', 0, alarm({})))
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'bar@here.com', alarm({})))
|
'bar@here.com', 0, alarm({})))
|
||||||
|
|
||||||
sent, failed, invalid = notifiers.send_notifications(notifications)
|
sent, failed, invalid = notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
self.assertEqual(len(sent), 3)
|
self.assertEqual(len(sent), 3)
|
||||||
self.assertEqual(failed, 0)
|
self.assertEqual(failed, [])
|
||||||
self.assertEqual(invalid, 0)
|
self.assertEqual(invalid, [])
|
||||||
|
|
||||||
for n in sent:
|
for n in sent:
|
||||||
self.assertEqual(n.notification_timestamp, 42)
|
self.assertEqual(n.notification_timestamp, 42)
|
||||||
@ -348,13 +348,13 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'me@here.com', alarm({})))
|
'me@here.com', 0, alarm({})))
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'foo@here.com', alarm({})))
|
'foo@here.com', 0, alarm({})))
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'bar@here.com', alarm({})))
|
'bar@here.com', 0, alarm({})))
|
||||||
|
|
||||||
notifiers.send_notifications(notifications)
|
notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
|
@ -146,6 +146,7 @@ class TestWebhook(unittest.TestCase):
|
|||||||
1,
|
1,
|
||||||
'pagerduty notification',
|
'pagerduty notification',
|
||||||
'ABCDEF',
|
'ABCDEF',
|
||||||
|
0,
|
||||||
alarm_dict)
|
alarm_dict)
|
||||||
|
|
||||||
self.trap.put(pagerduty.send_notification(notification))
|
self.trap.put(pagerduty.send_notification(notification))
|
||||||
|
@ -81,7 +81,7 @@ class TestWebhook(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metric)
|
alarm_dict = alarm(metric)
|
||||||
|
|
||||||
notification = Notification('webhook', 0, 1, 'webhook notification', 'me@here.com', alarm_dict)
|
notification = Notification('webhook', 0, 1, 'webhook notification', 'me@here.com', 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.put(webhook.send_notification(notification))
|
self.trap.put(webhook.send_notification(notification))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user