Query database to see if the periodic notificiation changes

Also added notification id to notification object we construct in the
alarm processor

Change-Id: I6ccfffc9102bec8f670df13268e244cb0c998950
This commit is contained in:
Michael James Hoppal 2016-08-08 09:12:21 -06:00
parent 2106901f11
commit 063c4f1d1d
20 changed files with 244 additions and 136 deletions

View File

@ -14,7 +14,7 @@
class BaseRepo(object): class BaseRepo(object):
def __init__(self, config): def __init__(self, config):
self._find_alarm_action_sql = """SELECT name, type, address, period self._find_alarm_action_sql = """SELECT id, type, name, address, period
FROM alarm_action as aa FROM alarm_action as aa
JOIN notification_method as nm ON aa.action_id = nm.id JOIN notification_method as nm ON aa.action_id = nm.id
WHERE aa.alarm_definition_id = %s and aa.alarm_state = %s""" WHERE aa.alarm_definition_id = %s and aa.alarm_state = %s"""
@ -23,3 +23,6 @@ class BaseRepo(object):
WHERE alarm.id = %s""" WHERE alarm.id = %s"""
self._insert_notification_types_sql = """INSERT INTO notification_method_type (name) VALUES ( %s)""" self._insert_notification_types_sql = """INSERT INTO notification_method_type (name) VALUES ( %s)"""
self._find_all_notification_types_sql = """SELECT name from notification_method_type """ self._find_all_notification_types_sql = """SELECT name from notification_method_type """
self._get_notification_sql = """SELECT name, type, address, period
FROM notification_method
WHERE id = %s"""

View File

@ -59,7 +59,7 @@ class MysqlRepo(BaseRepo):
log.exception('MySQL connect failed %s', e) log.exception('MySQL connect failed %s', e)
raise raise
def fetch_notification(self, alarm): def fetch_notifications(self, alarm):
try: try:
if self._mysql is None: if self._mysql is None:
self._connect_to_mysql() self._connect_to_mysql()
@ -67,7 +67,7 @@ class MysqlRepo(BaseRepo):
cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState'])) cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState']))
for row in cur: for row in cur:
yield (row[1].lower(), row[0], row[2], row[3]) yield (row[0], row[1].lower(), row[2], row[3], row[4])
except pymysql.Error as e: except pymysql.Error as e:
self._mysql = None self._mysql = None
log.exception("Couldn't fetch alarms actions %s", e) log.exception("Couldn't fetch alarms actions %s", e)
@ -122,3 +122,19 @@ class MysqlRepo(BaseRepo):
self._mysql = None self._mysql = None
log.exception("Couldn't insert notification types %s", e) log.exception("Couldn't insert notification types %s", e)
raise exc.DatabaseException(e) raise exc.DatabaseException(e)
def get_notification(self, notification_id):
try:
if self._mysql is None:
self._connect_to_mysql()
cur = self._mysql.cursor()
cur.execute(self._get_notification_sql, notification_id)
row = cur.fetchone()
if row is None:
return None
else:
return [row[0], row[1].lower(), row[2], row[3]]
except pymysql.Error as e:
self._mysql = None
log.exception("Couldn't fetch the notification method %s", e)
raise exc.DatabaseException(e)

View File

@ -23,9 +23,10 @@ from __future__ import unicode_literals
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, String, Enum, DateTime, ForeignKey, Table from sqlalchemy import Column, String, Enum, DateTime, ForeignKey, Table
ALARM_STATES = ('UNDETERMINED', 'OK', 'ALARM')
def create_alarm_action_model(metadata=None): def create_alarm_action_model(metadata=None):
ALARM_STATES = ('UNDETERMINED', 'OK', 'ALARM')
return Table('alarm_action', metadata, return Table('alarm_action', metadata,
Column('action_id', Column('action_id',
String(36), ForeignKey('notification_method.id'), String(36), ForeignKey('notification_method.id'),
@ -49,3 +50,15 @@ def create_notification_method_model(metadata=None):
def create_notification_method_type_model(metadata=None): def create_notification_method_type_model(metadata=None):
return Table('notification_method_type', metadata, return Table('notification_method_type', metadata,
Column('name', String(20), primary_key=True)) Column('name', String(20), primary_key=True))
def create_alarm_model(metadata=None):
return Table('alarm', metadata,
Column('id', String(20), primary_key=True),
Column('alarm_definition_id', String(36)),
Column('state', Enum(*ALARM_STATES)),
Column('lifecycle_state', String(50)),
Column('link', String(512)),
Column('created_at', DateTime, default=lambda: datetime.utcnow()),
Column('updated_at', DateTime, onupdate=lambda: datetime.utcnow()),
Column('state_updated_at', DateTime))

View File

@ -31,17 +31,24 @@ class OrmRepo(object):
aa = models.create_alarm_action_model(metadata).alias('aa') aa = models.create_alarm_action_model(metadata).alias('aa')
nm = models.create_notification_method_model(metadata).alias('nm') nm = models.create_notification_method_model(metadata).alias('nm')
nmt = models.create_notification_method_type_model(metadata).alias('nmt') nmt = models.create_notification_method_type_model(metadata).alias('nmt')
a = models.create_alarm_model(metadata).alias('a')
self._orm_query = (select([nm.c.name, nm.c.type, nm.c.address, nm.c.periodic_interval]) self._orm_query = select([nm.c.id, nm.c.type, nm.c.name, nm.c.address, nm.c.period])\
.select_from(aa.join(nm, aa.c.action_id == nm.c.id)) .select_from(aa.join(nm, aa.c.action_id == nm.c.id))\
.where(and_(aa.c.alarm_definition_id == bindparam('alarm_definition_id'), .where(
aa.c.alarm_state == bindparam('alarm_state')))) and_(aa.c.alarm_definition_id == bindparam('alarm_definition_id'),
aa.c.alarm_state == bindparam('alarm_state')))
self._orm_get_alarm_state = select([a.c.state]).where(a.c.id == bindparam('alarm_id'))
self._orm_nmt_query = select([nmt.c.name]) self._orm_nmt_query = select([nmt.c.name])
self._orm_get_notification = select([nm.c.name, nm.c.type, nm.c.address, nm.c.period])\
.where(nm.c.id == bindparam('notification_id'))
self._orm = None self._orm = None
def fetch_notification(self, alarm): def fetch_notifications(self, alarm):
try: try:
with self._orm_engine.connect() as conn: with self._orm_engine.connect() as conn:
log.debug('Orm query {%s}', str(self._orm_query)) log.debug('Orm query {%s}', str(self._orm_query))
@ -49,11 +56,24 @@ class OrmRepo(object):
alarm_definition_id=alarm['alarmDefinitionId'], alarm_definition_id=alarm['alarmDefinitionId'],
alarm_state=alarm['newState']) alarm_state=alarm['newState'])
return [(row[1].lower(), row[0], row[2], row[3]) for row in notifications] return [(row[0], row[1].lower(), row[2], row[3], row[4]) for row in notifications]
except DatabaseError as e: except DatabaseError as e:
log.exception("Couldn't fetch alarms actions %s", e) log.exception("Couldn't fetch alarms actions %s", e)
raise exc.DatabaseException(e) raise exc.DatabaseException(e)
def get_alarm_current_state(self, alarm_id):
try:
with self._orm_engine.connect() as conn:
log.debug('Orm query {%s}', str(self._orm_get_alarm_state))
result = conn.execute(self._orm_get_alarm_state,
alarm_id=alarm_id)
row = result.fetchone()
state = row[0] if row is not None else None
return state
except DatabaseError as e:
log.exception("Couldn't fetch the current alarm state %s", e)
raise exc.DatabaseException(e)
def fetch_notification_method_types(self): def fetch_notification_method_types(self):
try: try:
with self._orm_engine.connect() as conn: with self._orm_engine.connect() as conn:
@ -72,6 +92,20 @@ class OrmRepo(object):
conn.execute(self.nmt.insert(), notification_type) conn.execute(self.nmt.insert(), notification_type)
except DatabaseError as e: except DatabaseError as e:
self._mysql = None
log.exception("Couldn't insert notification types %s", e) log.exception("Couldn't insert notification types %s", e)
raise exc.DatabaseException(e) raise exc.DatabaseException(e)
def get_notification(self, notification_id):
try:
with self._orm_engine.connect() as conn:
log.debug('Orm query {%s}', str(self._orm_get_notification))
result = conn.execute(self._orm_get_notification,
notification_id=notification_id)
notification = result.fetchone()
if notification is None:
return None
else:
return [notification[0], notification[1].lower(), notification[2], notification[3]]
except DatabaseError as e:
log.exception("Couldn't fetch the notification method %s", e)
raise exc.DatabaseException(e)

View File

@ -35,14 +35,14 @@ class PostgresqlRepo(BaseRepo):
log.exception('Pgsql connect failed %s', e) log.exception('Pgsql connect failed %s', e)
raise raise
def fetch_notification(self, alarm): def fetch_notifications(self, alarm):
try: try:
if self._pgsql is None: if self._pgsql is None:
self._connect_to_pgsql() self._connect_to_pgsql()
cur = self._pgsql.cursor() cur = self._pgsql.cursor()
cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState'])) cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState']))
for row in cur: for row in cur:
yield (row[1].lower(), row[0], row[2], row[3]) yield (row[0], row[1].lower(), row[2], row[3], row[4])
except psycopg2.Error as e: except psycopg2.Error as e:
log.exception("Couldn't fetch alarms actions %s", e) log.exception("Couldn't fetch alarms actions %s", e)
raise exc.DatabaseException(e) raise exc.DatabaseException(e)
@ -66,11 +66,9 @@ class PostgresqlRepo(BaseRepo):
self._connect_to_pgsql() self._connect_to_pgsql()
cur = self._pgsql.cursor() cur = self._pgsql.cursor()
cur.execute(self._find_all_notification_types_sql) cur.execute(self._find_all_notification_types_sql)
for row in cur: for row in cur:
yield (row[0]) yield (row[0])
except psycopg2.Error as e: except psycopg2.Error as e:
self._mysql = None
log.exception("Couldn't fetch notification types %s", e) log.exception("Couldn't fetch notification types %s", e)
raise exc.DatabaseException(e) raise exc.DatabaseException(e)
@ -80,8 +78,21 @@ class PostgresqlRepo(BaseRepo):
self._connect_to_pgsql() self._connect_to_pgsql()
cur = self._pgsql.cursor() cur = self._pgsql.cursor()
cur.executemany(self._insert_notification_types_sql, notification_types) cur.executemany(self._insert_notification_types_sql, notification_types)
except psycopg2.Error as e: except psycopg2.Error as e:
self._mysql = None
log.exception("Couldn't insert notification types %s", e) log.exception("Couldn't insert notification types %s", e)
raise exc.DatabaseException(e) raise exc.DatabaseException(e)
def get_notification(self, notification_id):
try:
if self._pgsql is None:
self._connect_to_pgsql()
cur = self._pgsql.cursor()
cur.execute(self._get_notification_sql, notification_id)
row = cur.fetchone()
if row is None:
return None
else:
return [row[0], row[1].lower(), row[2], row[3]]
except psycopg2.Error as e:
log.exception("Couldn't fetch the notification method %s", e)
raise exc.DatabaseException(e)

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2016 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -12,12 +12,60 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from monasca_common.simport import simport from monasca_common.simport import simport
from monasca_notification.common.repositories import exceptions
from monasca_notification.notification import Notification
log = logging.getLogger(__name__)
def get_db_repo(config): def get_db_repo(config):
if 'database' in config and 'repo_driver' in config['database']: if 'database' in config and 'repo_driver' in config['database']:
return simport.load(config['database']['repo_driver'])(config) return simport.load(config['database']['repo_driver'])(config)
else: else:
return simport.load('monasca_notification.common.repositories.mysql.mysql_repo:MysqlRepo')(config) return simport.load('monasca_notification.common.repositories.mysql.mysql_repo:MysqlRepo')(config)
def construct_notification_object(db_repo, notification_json):
try:
notification = Notification(notification_json['id'],
notification_json['type'],
notification_json['name'],
notification_json['address'],
notification_json['period'],
notification_json['retry_count'],
notification_json['raw_alarm'])
# Grab notification method from database to see if it was changed
stored_notification = grab_stored_notification_method(db_repo, notification.id)
# Notification method was deleted
if stored_notification is None:
log.debug("Notification method {0} was deleted from database. "
"Will stop sending.".format(notification.id))
return None
# Update notification method with most up to date values
else:
notification.name = stored_notification[0]
notification.type = stored_notification[1]
notification.address = stored_notification[2]
notification.period = stored_notification[3]
return notification
except exceptions.DatabaseException:
log.warn("Error querying mysql for notification method. "
"Using currently cached method.")
return notification
except Exception as e:
log.warn("Error when attempting to construct notification {0}".format(e))
return None
def grab_stored_notification_method(db_repo, notification_id):
try:
stored_notification = db_repo.get_notification(notification_id)
except exceptions.DatabaseException:
log.debug('Database Error. Attempting reconnect')
stored_notification = db_repo.get_notification(notification_id)
return stored_notification

View File

@ -1,4 +1,4 @@
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -25,11 +25,10 @@ class Notification(object):
'alarm_id', 'alarm_id',
'alarm_name', 'alarm_name',
'alarm_timestamp', 'alarm_timestamp',
'id',
'message', 'message',
'name', 'name',
'notification_timestamp', 'notification_timestamp',
'src_partition',
'src_offset',
'state', 'state',
'severity', 'severity',
'link', 'link',
@ -43,25 +42,22 @@ class Notification(object):
'periodic_topic' 'periodic_topic'
) )
def __init__(self, ntype, src_partition, src_offset, name, address, period, def __init__(self, id, type, name, address, period, retry_count, alarm):
retry_count, alarm):
"""Setup the notification object """Setup the notification object
The src_partition and src_offset allow the notification id - The notification id
to be linked to the alarm which triggered it. type - The notification type
ntype - The notification type
name - Name used in sending name - Name used in sending
address - where to send the notification address - where to send the notification
period - period of sending the notification period - period of sending the notificationv
retry_count - number of times we've tried to send retry_count - number of times we've tried to send
alarm - info that caused the notification alarm - info that caused the notification
notifications that come after this one to remain uncommitted. notifications that come after this one to remain uncommitted.
Note that data may include unicode strings. Note that data may include unicode strings.
""" """
self.id = id
self.address = address self.address = address
self.name = name self.name = name
self.src_partition = src_partition self.type = type
self.src_offset = src_offset
self.type = ntype
self.retry_count = retry_count self.retry_count = retry_count
self.raw_alarm = alarm self.raw_alarm = alarm
@ -99,6 +95,7 @@ class Notification(object):
"""Return json representation """Return json representation
""" """
notification_fields = [ notification_fields = [
'id',
'type', 'type',
'name', 'name',
'address', 'address',

View File

@ -48,7 +48,7 @@ class NotificationEngine(object):
def _add_periodic_notifications(self, notifications): def _add_periodic_notifications(self, notifications):
for notification in notifications: for notification in notifications:
topic = notification.periodic_topic topic = notification.periodic_topic
if topic in self._config['kafka']['periodic']: if topic in self._config['kafka']['periodic'] and notification.type == "webhook":
notification.notification_timestamp = time.time() notification.notification_timestamp = time.time()
self._producer.publish(self._config['kafka']['periodic'][topic], self._producer.publish(self._config['kafka']['periodic'][topic],
[notification.to_json()]) [notification.to_json()])

View File

@ -21,8 +21,8 @@ import time
from monasca_common.kafka.consumer import KafkaConsumer from monasca_common.kafka.consumer import KafkaConsumer
from monasca_common.kafka.producer import KafkaProducer from monasca_common.kafka.producer import KafkaProducer
from monasca_notification.common.repositories import exceptions from monasca_notification.common.repositories import exceptions
from monasca_notification.common.utils import construct_notification_object
from monasca_notification.common.utils import get_db_repo from monasca_notification.common.utils import get_db_repo
from notification import Notification
from processors.base import BaseProcessor from processors.base import BaseProcessor
from processors.notification_processor import NotificationProcessor from processors.notification_processor import NotificationProcessor
@ -30,13 +30,13 @@ log = logging.getLogger(__name__)
class PeriodicEngine(object): class PeriodicEngine(object):
def __init__(self, config, interval): def __init__(self, config, period):
self._topic_name = config['kafka']['periodic'][interval] self._topic_name = config['kafka']['periodic'][period]
self._statsd = monascastatsd.Client(name='monasca', self._statsd = monascastatsd.Client(name='monasca',
dimensions=BaseProcessor.dimensions) dimensions=BaseProcessor.dimensions)
zookeeper_path = config['zookeeper']['periodic_path'][interval] zookeeper_path = config['zookeeper']['periodic_path'][period]
self._consumer = KafkaConsumer(config['kafka']['url'], self._consumer = KafkaConsumer(config['kafka']['url'],
config['zookeeper']['url'], config['zookeeper']['url'],
zookeeper_path, zookeeper_path,
@ -47,46 +47,46 @@ class PeriodicEngine(object):
self._notifier = NotificationProcessor(config) self._notifier = NotificationProcessor(config)
self._db_repo = get_db_repo(config) self._db_repo = get_db_repo(config)
self._period = period
def _keep_sending(self, alarm_id, original_state): def _keep_sending(self, alarm_id, original_state, type, period):
# Go to DB and check alarm state
try: try:
current_state = self._db_repo.get_alarm_current_state(alarm_id) current_state = self._db_repo.get_alarm_current_state(alarm_id)
except exceptions.DatabaseException: except exceptions.DatabaseException:
log.debug('Database Error. Attempting reconnect') log.debug('Database Error. Attempting reconnect')
current_state = self._db_repo.get_alarm_current_state(alarm_id) current_state = self._db_repo.get_alarm_current_state(alarm_id)
# Alarm was deleted # Alarm was deleted
if current_state is None: if current_state is None:
return False return False
# Alarm state changed # Alarm state changed
if current_state != original_state: if current_state != original_state:
return False return False
# Period changed
if period != self._period:
return False
if type != "webhook":
return False
return True return True
def run(self): def run(self):
for raw_notification in self._consumer: for raw_notification in self._consumer:
partition = raw_notification[0]
offset = raw_notification[1].offset
message = raw_notification[1].message.value message = raw_notification[1].message.value
notification_data = json.loads(message) notification_data = json.loads(message)
ntype = notification_data['type'] notification = construct_notification_object(self._db_repo, notification_data)
name = notification_data['name']
addr = notification_data['address']
period = notification_data['period']
notification = Notification(ntype, if notification is None:
partition, self._consumer.commit()
offset, continue
name,
addr,
period,
notification_data['retry_count'],
notification_data['raw_alarm'])
if self._keep_sending(notification.alarm_id, notification.state): if self._keep_sending(notification.alarm_id,
wait_duration = period - ( notification.state,
notification.type,
notification.period):
wait_duration = notification.period - (
time.time() - notification_data['notification_timestamp']) time.time() - notification_data['notification_timestamp'])
if wait_duration > 0: if wait_duration > 0:

View File

@ -1,4 +1,4 @@
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -78,18 +78,17 @@ class AlarmProcessor(BaseProcessor):
return True return True
def _build_notification(self, partition, offset, alarm): def _build_notification(self, alarm):
db_time = self._statsd.get_timer() db_time = self._statsd.get_timer()
with db_time.time('config_db_time'): with db_time.time('config_db_time'):
alarms_actions = self._db_repo.fetch_notification(alarm) alarms_actions = self._db_repo.fetch_notifications(alarm)
return [Notification(alarms_action[0], return [Notification(alarms_action[0],
partition,
offset,
alarms_action[1], alarms_action[1],
alarms_action[2], alarms_action[2],
alarms_action[3], alarms_action[3],
alarms_action[4],
0, 0,
alarm) for alarms_action in alarms_actions] alarm) for alarms_action in alarms_actions]
@ -117,10 +116,10 @@ class AlarmProcessor(BaseProcessor):
return [], partition, offset return [], partition, offset
try: try:
notifications = self._build_notification(partition, offset, alarm) notifications = self._build_notification(alarm)
except exc.DatabaseException: except exc.DatabaseException:
log.debug('Database Error. Attempting reconnect') log.debug('Database Error. Attempting reconnect')
notifications = self._build_notification(partition, offset, alarm) notifications = self._build_notification(alarm)
if len(notifications) == 0: if len(notifications) == 0:
no_notification_count += 1 no_notification_count += 1

View File

@ -20,7 +20,8 @@ import time
from monasca_common.kafka.consumer import KafkaConsumer from monasca_common.kafka.consumer import KafkaConsumer
from monasca_common.kafka.producer import KafkaProducer from monasca_common.kafka.producer import KafkaProducer
from notification import Notification from monasca_notification.common.utils import construct_notification_object
from monasca_notification.common.utils import get_db_repo
from processors.base import BaseProcessor from processors.base import BaseProcessor
from processors.notification_processor import NotificationProcessor from processors.notification_processor import NotificationProcessor
@ -48,28 +49,19 @@ class RetryEngine(object):
self._producer = KafkaProducer(config['kafka']['url']) self._producer = KafkaProducer(config['kafka']['url'])
self._notifier = NotificationProcessor(config) self._notifier = NotificationProcessor(config)
self._db_repo = get_db_repo(config)
def run(self): def run(self):
for raw_notification in self._consumer: for raw_notification in self._consumer:
partition = raw_notification[0]
offset = raw_notification[1].offset
message = raw_notification[1].message.value message = raw_notification[1].message.value
notification_data = json.loads(message) notification_data = json.loads(message)
ntype = notification_data['type'] notification = construct_notification_object(self._db_repo, notification_data)
name = notification_data['name']
addr = notification_data['address']
period = notification_data['period']
notification = Notification(ntype, if notification is None:
partition, self._consumer.commit()
offset, continue
name,
addr,
period,
notification_data['retry_count'],
notification_data['raw_alarm'])
wait_duration = self._retry_interval - ( wait_duration = self._retry_interval - (
time.time() - notification_data['notification_timestamp']) time.time() - notification_data['notification_timestamp'])
@ -89,13 +81,18 @@ class RetryEngine(object):
if notification.retry_count < self._retry_max: if notification.retry_count < self._retry_max:
log.error(u"retry failed for {} with name {} " log.error(u"retry failed for {} with name {} "
u"at {}. " u"at {}. "
u"Saving for later retry.".format(ntype, name, addr)) u"Saving for later retry.".format(notification.type,
notification.name,
notification.address))
self._producer.publish(self._topics['retry_topic'], self._producer.publish(self._topics['retry_topic'],
[notification.to_json()]) [notification.to_json()])
else: else:
log.error(u"retry failed for {} with name {} " log.error(u"retry failed for {} with name {} "
u"at {} after {} retries. " u"at {} after {} retries. "
u"Giving up on retry." u"Giving up on retry."
.format(ntype, name, addr, self._retry_max)) .format(notification.type,
notification.name,
notification.address,
self._retry_max))
self._consumer.commit() self._consumer.commit()

View File

@ -57,11 +57,12 @@ def enabled_notifications():
def config(config): def config(config):
formatted_config = {type.lower(): value for type, value in config.iteritems()}
for notifier in possible_notifiers: for notifier in possible_notifiers:
ntype = notifier.type ntype = notifier.type.lower()
if ntype in config: if ntype in formatted_config:
try: try:
notifier.config(config[ntype]) notifier.config(formatted_config[ntype])
configured_notifiers[ntype] = notifier configured_notifiers[ntype] = notifier
statsd_counter[ntype] = statsd.get_counter(notifier.statsd_name) statsd_counter[ntype] = statsd.get_counter(notifier.statsd_name)
log.info("{} notification ready".format(ntype)) log.info("{} notification ready".format(ntype))
@ -69,6 +70,9 @@ def config(config):
log.exception("config exception for {}".format(ntype)) log.exception("config exception for {}".format(ntype))
else: else:
log.warn("No config data for type: {}".format(ntype)) log.warn("No config data for type: {}".format(ntype))
config_with_no_notifiers = set(formatted_config.keys()) - set(configured_notifiers.keys())
if config_with_no_notifiers:
log.warn("No notifiers found for {0}". format(", ".join(config_with_no_notifiers)))
def send_notifications(notifications): def send_notifications(notifications):
@ -94,7 +98,7 @@ def send_notifications(notifications):
else: else:
failed.append(notification) failed.append(notification)
return (sent, failed, invalid) return sent, failed, invalid
def send_single_notification(notification): def send_single_notification(notification):

View File

@ -1,4 +1,4 @@
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -124,10 +124,10 @@ class TestAlarmProcessor(unittest.TestCase):
"severity": "LOW", "link": "http://some-place.com", "lifecycleState": "OPEN"} "severity": "LOW", "link": "http://some-place.com", "lifecycleState": "OPEN"}
alarm = self._create_raw_alarm(0, 4, alarm_dict) alarm = self._create_raw_alarm(0, 4, alarm_dict)
sql_response = [['test notification', 'EMAIL', 'me@here.com', 0]] sql_response = [[1, 'EMAIL', 'test notification', 'me@here.com', 0]]
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response) notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', 0, 0, alarm_dict) test_notification = Notification(1, 'email', 'test notification', 'me@here.com', 0, 0, alarm_dict)
self.assertEqual(notifications, [test_notification]) self.assertEqual(notifications, [test_notification])
self.assertEqual(partition, 0) self.assertEqual(partition, 0)
@ -141,12 +141,12 @@ class TestAlarmProcessor(unittest.TestCase):
alarm = self._create_raw_alarm(0, 5, alarm_dict) alarm = self._create_raw_alarm(0, 5, alarm_dict)
sql_response = [['test notification', 'EMAIL', 'me@here.com', 0], sql_response = [[1, 'EMAIL', 'test notification', 'me@here.com', 0],
['test notification2', 'EMAIL', 'me@here.com', 0]] [2, 'EMAIL', 'test notification2', 'me@here.com', 0]]
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response) notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', 0, 0, alarm_dict) test_notification = Notification(1, 'email', 'test notification', 'me@here.com', 0, 0, alarm_dict)
test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', 0, 0, alarm_dict) test_notification2 = Notification(2, 'email', 'test notification2', 'me@here.com', 0, 0, alarm_dict)
self.assertEqual(notifications, [test_notification, test_notification2]) self.assertEqual(notifications, [test_notification, test_notification2])
self.assertEqual(partition, 0) self.assertEqual(partition, 0)

View File

@ -104,7 +104,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metric) alarm_dict = alarm(metric)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict) notification = Notification(0, 'email', 'email notification', 'me@here.com', 0, 0, alarm_dict)
self.trap.append(email.send_notification(notification)) self.trap.append(email.send_notification(notification))
@ -222,7 +222,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics) alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict) notification = Notification(0, 'email', 'email notification', 'me@here.com', 0, 0, alarm_dict)
self.trap.append(email.send_notification(notification)) self.trap.append(email.send_notification(notification))
@ -268,7 +268,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics) alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict) notification = Notification(0, 'email', 'email notification', 'me@here.com', 0, 0, alarm_dict)
email_result = email.send_notification(notification) email_result = email.send_notification(notification)
@ -311,7 +311,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics) alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict) notification = Notification(0, 'email', 'email notification', 'me@here.com', 0, 0, alarm_dict)
self.trap.append(email.send_notification(notification)) self.trap.append(email.send_notification(notification))
@ -351,7 +351,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics) alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict) notification = Notification(0, 'email', 'email notification', 'me@here.com', 0, 0, alarm_dict)
self.trap.append(email.send_notification(notification)) self.trap.append(email.send_notification(notification))
@ -391,7 +391,7 @@ class TestEmail(unittest.TestCase):
alarm_dict = alarm(metrics) alarm_dict = alarm(metrics)
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict) notification = Notification(0, 'email', 'email notification', 'me@here.com', 0, 0, alarm_dict)
self.trap.append(email.send_notification(notification)) self.trap.append(email.send_notification(notification))

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2015 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -44,7 +44,7 @@ class TestMySqlRepo(unittest.TestCase):
'newState': 'bar'} 'newState': 'bar'}
def get_notification(repo, alarm): def get_notification(repo, alarm):
g = repo.fetch_notification(alarm) g = repo.fetch_notifications(alarm)
for x in g: for x in g:
pass pass

View File

@ -1,4 +1,4 @@
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP # (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -33,11 +33,11 @@ def test_json():
"lifecycleState": "OPEN", "lifecycleState": "OPEN",
'tenantId': 'tenantId', 'tenantId': 'tenantId',
'metrics': 'cpu_util'} 'metrics': 'cpu_util'}
test_notification = notification.Notification('ntype', 'src_partition', test_notification = notification.Notification(1, 'ntype', 'name',
'src_offset', 'name',
'address', 0, 0, alarm) 'address', 0, 0, alarm)
expected_dict = {u'name': u'name', expected_dict = {u'id': 1,
u'name': u'name',
u'type': u'ntype', u'type': u'ntype',
u'notification_timestamp': None, u'notification_timestamp': None,
u'tenant_id': u'tenantId', u'tenant_id': u'tenantId',
@ -83,11 +83,11 @@ def test_json_non_zero_period():
"lifecycleState": "OPEN", "lifecycleState": "OPEN",
'tenantId': 'tenantId', 'tenantId': 'tenantId',
'metrics': 'cpu_util'} 'metrics': 'cpu_util'}
test_notification = notification.Notification('ntype', 'src_partition', test_notification = notification.Notification(1, 'ntype', 'name',
'src_offset', 'name',
'address', 60, 0, alarm) 'address', 60, 0, alarm)
expected_dict = {u'name': u'name', expected_dict = {u'id': 1,
u'name': u'name',
u'type': u'ntype', u'type': u'ntype',
u'notification_timestamp': None, u'notification_timestamp': None,
u'tenant_id': u'tenantId', u'tenant_id': u'tenantId',
@ -130,11 +130,9 @@ def test_equal():
"lifecycleState": "OPEN", "lifecycleState": "OPEN",
'tenantId': 'tenantId', 'tenantId': 'tenantId',
'metrics': 'cpu_util'} 'metrics': 'cpu_util'}
test_notification = notification.Notification('ntype', 'src_partition', test_notification = notification.Notification(0, 'ntype', 'name',
'src_offset', 'name',
'address', 0, 0, alarm) 'address', 0, 0, alarm)
test_notification2 = notification.Notification('ntype', 'src_partition', test_notification2 = notification.Notification(0, 'ntype', 'name',
'src_offset', 'name',
'address', 0, 0, alarm) 'address', 0, 0, alarm)
assert(test_notification == test_notification2) assert(test_notification == test_notification2)
@ -151,11 +149,9 @@ def test_unequal():
"lifecycleState": "OPEN", "lifecycleState": "OPEN",
'tenantId': 'tenantId', 'tenantId': 'tenantId',
'metrics': 'cpu_util'} 'metrics': 'cpu_util'}
test_notification = notification.Notification('ntype', 'src_partition', test_notification = notification.Notification(0, 'ntype', 'name',
'src_offset', 'name',
'address', 0, 0, alarm) 'address', 0, 0, alarm)
test_notification2 = notification.Notification('ntype', 'src_partition', test_notification2 = notification.Notification(1, 'ntype', 'name',
'src_offset', 'name',
'address', 0, 0, alarm) 'address', 0, 0, alarm)
test_notification2.alarm_id = None test_notification2.alarm_id = None

View File

@ -97,7 +97,7 @@ class TestNotificationProcessor(unittest.TestCase):
"timestamp": time.time(), "timestamp": time.time(),
"metrics": metric} "metrics": metric}
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict) notification = Notification(0, 'email', 'email notification', 'me@here.com', 0, 0, alarm_dict)
self._start_processor([notification]) self._start_processor([notification])
@ -111,7 +111,7 @@ class TestNotificationProcessor(unittest.TestCase):
alarm_dict = {"tenantId": "0", "alarmId": "0", "alarmName": "test Alarm", "oldState": "OK", "newState": "ALARM", alarm_dict = {"tenantId": "0", "alarmId": "0", "alarmName": "test Alarm", "oldState": "OK", "newState": "ALARM",
"stateChangeReason": "I am alarming!", "timestamp": time.time(), "metrics": "cpu_util", "stateChangeReason": "I am alarming!", "timestamp": time.time(), "metrics": "cpu_util",
"severity": "LOW", "link": "http://some-place.com", "lifecycleState": "OPEN"} "severity": "LOW", "link": "http://some-place.com", "lifecycleState": "OPEN"}
invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', 0, 0, alarm_dict) invalid_notification = Notification(0, 'invalid', 'test notification', 'me@here.com', 0, 0, alarm_dict)
self._start_processor([invalid_notification]) self._start_processor([invalid_notification])

View File

@ -225,8 +225,7 @@ class TestInterface(unittest.TestCase):
notifiers.config(config_dict) notifiers.config(config_dict)
notifications = [] notifications = []
notifications.append(Notification('email', 0, 1, notifications.append(Notification(0, 'email', 'email notification',
'email notification',
'me@here.com', 0, 0, alarm({}))) 'me@here.com', 0, 0, alarm({})))
notifiers.send_notifications(notifications) notifiers.send_notifications(notifications)
@ -251,8 +250,7 @@ class TestInterface(unittest.TestCase):
notifiers.config(config_dict) notifiers.config(config_dict)
notifications = [] notifications = []
notifications.append(Notification('email', 0, 1, notifications.append(Notification(0, 'email', 'email notification',
'email notification',
'me@here.com', 0, 0, alarm({}))) 'me@here.com', 0, 0, alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications) sent, failed, invalid = notifiers.send_notifications(notifications)
@ -280,8 +278,7 @@ class TestInterface(unittest.TestCase):
self.assertIn("No config data for type: pagerduty", self.trap) self.assertIn("No config data for type: pagerduty", self.trap)
notifications = [] notifications = []
notifications.append(Notification('pagerduty', 0, 1, notifications.append(Notification(0, 'pagerduty', 'pagerduty notification',
'pagerduty notification',
'me@here.com', 0, 0, alarm({}))) 'me@here.com', 0, 0, alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications) sent, failed, invalid = notifiers.send_notifications(notifications)
@ -313,14 +310,11 @@ class TestInterface(unittest.TestCase):
notifiers.config(config_dict) notifiers.config(config_dict)
notifications = [] notifications = []
notifications.append(Notification('email', 0, 1, notifications.append(Notification(0, 'email', 'email notification',
'email notification',
'me@here.com', 0, 0, alarm({}))) 'me@here.com', 0, 0, alarm({})))
notifications.append(Notification('email', 0, 1, notifications.append(Notification(1, 'email', 'email notification',
'email notification',
'foo@here.com', 0, 0, alarm({}))) 'foo@here.com', 0, 0, alarm({})))
notifications.append(Notification('email', 0, 1, notifications.append(Notification(2, 'email', 'email notification',
'email notification',
'bar@here.com', 0, 0, alarm({}))) 'bar@here.com', 0, 0, alarm({})))
sent, failed, invalid = notifiers.send_notifications(notifications) sent, failed, invalid = notifiers.send_notifications(notifications)
@ -349,14 +343,11 @@ class TestInterface(unittest.TestCase):
notifiers.config(config_dict) notifiers.config(config_dict)
notifications = [] notifications = []
notifications.append(Notification('email', 0, 1, notifications.append(Notification(0, 'email', 'email notification',
'email notification',
'me@here.com', 0, 0, alarm({}))) 'me@here.com', 0, 0, alarm({})))
notifications.append(Notification('email', 0, 1, notifications.append(Notification(1, 'email', 'email notification',
'email notification',
'foo@here.com', 0, 0, alarm({}))) 'foo@here.com', 0, 0, alarm({})))
notifications.append(Notification('email', 0, 1, notifications.append(Notification(2, 'email', 'email notification',
'email notification',
'bar@here.com', 0, 0, alarm({}))) 'bar@here.com', 0, 0, alarm({})))
notifiers.send_notifications(notifications) notifiers.send_notifications(notifications)

View File

@ -144,9 +144,8 @@ class TestWebhook(unittest.TestCase):
alarm_dict = alarm(metric) alarm_dict = alarm(metric)
notification = Notification('pagerduty', notification = Notification(0,
0, 'pagerduty',
1,
'pagerduty notification', 'pagerduty notification',
'ABCDEF', 'ABCDEF',
0, 0,

View File

@ -86,7 +86,7 @@ class TestWebhook(unittest.TestCase):
alarm_dict = alarm(metric) alarm_dict = alarm(metric)
notification = Notification('webhook', 0, 1, 'webhook notification', 'http://mock:3333/', 0, 0, alarm_dict) notification = Notification(0, 'webhook', 'webhook notification', 'http://mock:3333/', 0, 0, alarm_dict)
self.trap.put(webhook.send_notification(notification)) self.trap.put(webhook.send_notification(notification))