Adding support for periodic notifications
New periodic notification engine that will take a notification send it, and then place it back onto the periodic notification topic. Once the alarm associated with the notification has transitioned to a state that is different from the original state the notification is removed from the queue. Change-Id: Ie3103a0ec30abcd8dfc53869b1c3135953aabf3a
This commit is contained in:
parent
1fcb2fdf57
commit
79189ca811
@ -1,4 +1,5 @@
|
|||||||
# Copyright 2015 FUJITSU LIMITED
|
# Copyright 2015 FUJITSU LIMITED
|
||||||
|
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||||
# in compliance with the License. You may obtain a copy of the License at
|
# in compliance with the License. You may obtain a copy of the License at
|
||||||
@ -13,7 +14,10 @@
|
|||||||
|
|
||||||
class BaseRepo(object):
|
class BaseRepo(object):
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
self._find_alarm_action_sql = """SELECT name, type, address
|
self._find_alarm_action_sql = """SELECT name, type, address, period
|
||||||
FROM alarm_action as aa
|
FROM alarm_action as aa
|
||||||
JOIN notification_method as nm ON aa.action_id = nm.id
|
JOIN notification_method as nm ON aa.action_id = nm.id
|
||||||
WHERE aa.alarm_definition_id = %s and aa.alarm_state = %s"""
|
WHERE aa.alarm_definition_id = %s and aa.alarm_state = %s"""
|
||||||
|
self._find_alarm_state_sql = """SELECT state
|
||||||
|
FROM alarm
|
||||||
|
WHERE alarm.id = %s"""
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
# Copyright 2015 FUJITSU LIMITED
|
# Copyright 2015 FUJITSU LIMITED
|
||||||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||||
# in compliance with the License. You may obtain a copy of the License at
|
# in compliance with the License. You may obtain a copy of the License at
|
||||||
@ -63,13 +63,26 @@ class MysqlRepo(BaseRepo):
|
|||||||
try:
|
try:
|
||||||
if self._mysql is None:
|
if self._mysql is None:
|
||||||
self._connect_to_mysql()
|
self._connect_to_mysql()
|
||||||
|
|
||||||
cur = self._mysql.cursor()
|
cur = self._mysql.cursor()
|
||||||
cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState']))
|
cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState']))
|
||||||
|
|
||||||
for row in cur:
|
for row in cur:
|
||||||
yield (row[1].lower(), row[0], row[2])
|
yield (row[1].lower(), row[0], row[2], row[3])
|
||||||
except pymysql.Error as e:
|
except pymysql.Error as e:
|
||||||
self._mysql = None
|
self._mysql = None
|
||||||
log.exception("Couldn't fetch alarms actions %s", e)
|
log.exception("Couldn't fetch alarms actions %s", e)
|
||||||
raise exc.DatabaseException(e)
|
raise exc.DatabaseException(e)
|
||||||
|
|
||||||
|
def get_alarm_current_state(self, alarm_id):
|
||||||
|
try:
|
||||||
|
if self._mysql is None:
|
||||||
|
self._connect_to_mysql()
|
||||||
|
cur = self._mysql.cursor()
|
||||||
|
cur.execute(self._find_alarm_state_sql, alarm_id)
|
||||||
|
row = cur.fetchone()
|
||||||
|
state = row[0] if row is not None else None
|
||||||
|
return state
|
||||||
|
except pymysql.Error as e:
|
||||||
|
self._mysql = None
|
||||||
|
log.exception("Couldn't fetch the current alarm state %s", e)
|
||||||
|
raise exc.DatabaseException(e)
|
@ -1,6 +1,7 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
# Copyright 2015 Fujitsu Technology Solutions
|
# Copyright 2015 Fujitsu Technology Solutions
|
||||||
|
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
@ -40,5 +41,6 @@ def create_notification_method_model(metadata=None):
|
|||||||
Column('name', String(250)),
|
Column('name', String(250)),
|
||||||
Column('tenant_id', String(36)),
|
Column('tenant_id', String(36)),
|
||||||
Column('type', String(255)),
|
Column('type', String(255)),
|
||||||
|
Column('period', int),
|
||||||
Column('created_at', DateTime, default=lambda: datetime.utcnow()),
|
Column('created_at', DateTime, default=lambda: datetime.utcnow()),
|
||||||
Column('updated_at', DateTime, onupdate=lambda: datetime.utcnow()))
|
Column('updated_at', DateTime, onupdate=lambda: datetime.utcnow()))
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
# Copyright 2015 FUJITSU LIMITED
|
# Copyright 2015 FUJITSU LIMITED
|
||||||
|
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||||
# in compliance with the License. You may obtain a copy of the License at
|
# in compliance with the License. You may obtain a copy of the License at
|
||||||
@ -30,7 +31,7 @@ class OrmRepo(object):
|
|||||||
aa = models.create_alarm_action_model(metadata).alias('aa')
|
aa = models.create_alarm_action_model(metadata).alias('aa')
|
||||||
nm = models.create_notification_method_model(metadata).alias('nm')
|
nm = models.create_notification_method_model(metadata).alias('nm')
|
||||||
|
|
||||||
self._orm_query = select([nm.c.name, nm.c.type, nm.c.address])\
|
self._orm_query = select([nm.c.name, nm.c.type, nm.c.address, nm.c.periodic_interval])\
|
||||||
.select_from(aa.join(nm, aa.c.action_id == nm.c.id))\
|
.select_from(aa.join(nm, aa.c.action_id == nm.c.id))\
|
||||||
.where(
|
.where(
|
||||||
and_(aa.c.alarm_definition_id == bindparam('alarm_definition_id'),
|
and_(aa.c.alarm_definition_id == bindparam('alarm_definition_id'),
|
||||||
@ -42,11 +43,11 @@ class OrmRepo(object):
|
|||||||
try:
|
try:
|
||||||
with self._orm_engine.connect() as conn:
|
with self._orm_engine.connect() as conn:
|
||||||
log.debug('Orm query {%s}', str(self._orm_query))
|
log.debug('Orm query {%s}', str(self._orm_query))
|
||||||
notifcations = conn.execute(self._orm_query,
|
notifications = conn.execute(self._orm_query,
|
||||||
alarm_definition_id=alarm['alarmDefinitionId'],
|
alarm_definition_id=alarm['alarmDefinitionId'],
|
||||||
alarm_state=alarm['newState'])
|
alarm_state=alarm['newState'])
|
||||||
|
|
||||||
return [(row[1].lower(), row[0], row[2]) for row in notifcations]
|
return [(row[1].lower(), row[0], row[2], row[3]) for row in notifications]
|
||||||
except DatabaseError as e:
|
except DatabaseError as e:
|
||||||
log.exception("Couldn't fetch alarms actions %s", e)
|
log.exception("Couldn't fetch alarms actions %s", e)
|
||||||
raise exc.DatabaseException(e)
|
raise exc.DatabaseException(e)
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
# Copyright 2015 FUJITSU LIMITED
|
# Copyright 2015 FUJITSU LIMITED
|
||||||
|
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||||
# in compliance with the License. You may obtain a copy of the License at
|
# in compliance with the License. You may obtain a copy of the License at
|
||||||
@ -41,7 +42,20 @@ class PostgresqlRepo(BaseRepo):
|
|||||||
cur = self._pgsql.cursor()
|
cur = self._pgsql.cursor()
|
||||||
cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState']))
|
cur.execute(self._find_alarm_action_sql, (alarm['alarmDefinitionId'], alarm['newState']))
|
||||||
for row in cur:
|
for row in cur:
|
||||||
yield (row[1].lower(), row[0], row[2])
|
yield (row[1].lower(), row[0], row[2], row[3])
|
||||||
except psycopg2.Error as e:
|
except psycopg2.Error as e:
|
||||||
log.exception("Couldn't fetch alarms actions %s", e)
|
log.exception("Couldn't fetch alarms actions %s", e)
|
||||||
raise exc.DatabaseException(e)
|
raise exc.DatabaseException(e)
|
||||||
|
|
||||||
|
def get_alarm_current_state(self, alarm_id):
|
||||||
|
try:
|
||||||
|
if self._pgsql is None:
|
||||||
|
self._connect_to_pgsql()
|
||||||
|
cur = self._pgsql.cursor()
|
||||||
|
cur.execute(self._find_alarm_state_sql, alarm_id)
|
||||||
|
row = cur.fetchone()
|
||||||
|
state = row[0] if row is not None else None
|
||||||
|
return state
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
log.exception("Couldn't fetch current alarm state %s", e)
|
||||||
|
raise exc.DatabaseException(e)
|
||||||
|
23
monasca_notification/common/utils.py
Normal file
23
monasca_notification/common/utils.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import simport
|
||||||
|
|
||||||
|
|
||||||
|
def get_db_repo(config):
|
||||||
|
if 'database' in config and 'repo_driver' in config['database']:
|
||||||
|
return simport.load(config['database']['repo_driver'])(config)
|
||||||
|
else:
|
||||||
|
return simport.load('monasca_notification.common.repositories.mysql.mysql_repo:MysqlRepo')(config)
|
@ -1,4 +1,3 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP
|
# (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
@ -28,6 +27,7 @@ import time
|
|||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from notification_engine import NotificationEngine
|
from notification_engine import NotificationEngine
|
||||||
|
from periodic_engine import PeriodicEngine
|
||||||
from retry_engine import RetryEngine
|
from retry_engine import RetryEngine
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@ -77,9 +77,9 @@ def clean_exit(signum, frame=None):
|
|||||||
sys.exit(signum)
|
sys.exit(signum)
|
||||||
|
|
||||||
|
|
||||||
def start_process(process_type, config):
|
def start_process(process_type, config, *args):
|
||||||
log.info("start process: {}".format(process_type))
|
log.info("start process: {}".format(process_type))
|
||||||
p = process_type(config)
|
p = process_type(config, *args)
|
||||||
p.run()
|
p.run()
|
||||||
|
|
||||||
|
|
||||||
@ -107,7 +107,10 @@ def main(argv=None):
|
|||||||
processors.append(multiprocessing.Process(
|
processors.append(multiprocessing.Process(
|
||||||
target=start_process, args=(RetryEngine, config)))
|
target=start_process, args=(RetryEngine, config)))
|
||||||
|
|
||||||
# Start
|
if 60 in config['kafka']['periodic']:
|
||||||
|
processors.append(multiprocessing.Process(
|
||||||
|
target=start_process, args=(PeriodicEngine, config, 60)))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.info('Starting processes')
|
log.info('Starting processes')
|
||||||
for process in processors:
|
for process in processors:
|
||||||
|
@ -38,10 +38,12 @@ class Notification(object):
|
|||||||
'type',
|
'type',
|
||||||
'metrics',
|
'metrics',
|
||||||
'retry_count',
|
'retry_count',
|
||||||
'raw_alarm'
|
'raw_alarm',
|
||||||
|
'period',
|
||||||
|
'periodic_topic'
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, ntype, src_partition, src_offset, name, address,
|
def __init__(self, ntype, src_partition, src_offset, name, address, period,
|
||||||
retry_count, alarm):
|
retry_count, alarm):
|
||||||
"""Setup the notification object
|
"""Setup the notification object
|
||||||
The src_partition and src_offset allow the notification
|
The src_partition and src_offset allow the notification
|
||||||
@ -49,6 +51,7 @@ class Notification(object):
|
|||||||
ntype - The notification type
|
ntype - The notification type
|
||||||
name - Name used in sending
|
name - Name used in sending
|
||||||
address - where to send the notification
|
address - where to send the notification
|
||||||
|
period - period of sending the notification
|
||||||
retry_count - number of times we've tried to send
|
retry_count - number of times we've tried to send
|
||||||
alarm - info that caused the notification
|
alarm - info that caused the notification
|
||||||
notifications that come after this one to remain uncommitted.
|
notifications that come after this one to remain uncommitted.
|
||||||
@ -78,6 +81,10 @@ class Notification(object):
|
|||||||
# to be updated on actual notification send time
|
# to be updated on actual notification send time
|
||||||
self.notification_timestamp = None
|
self.notification_timestamp = None
|
||||||
|
|
||||||
|
# set periodic topic
|
||||||
|
self.periodic_topic = period
|
||||||
|
self.period = period
|
||||||
|
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
if not isinstance(other, Notification):
|
if not isinstance(other, Notification):
|
||||||
return False
|
return False
|
||||||
@ -106,7 +113,9 @@ class Notification(object):
|
|||||||
'severity',
|
'severity',
|
||||||
'link',
|
'link',
|
||||||
'lifecycle_state',
|
'lifecycle_state',
|
||||||
'tenant_id'
|
'tenant_id',
|
||||||
|
'period',
|
||||||
|
'periodic_topic'
|
||||||
]
|
]
|
||||||
notification_data = {name: getattr(self, name)
|
notification_data = {name: getattr(self, name)
|
||||||
for name in notification_fields}
|
for name in notification_fields}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import monascastatsd
|
import monascastatsd
|
||||||
|
import time
|
||||||
|
|
||||||
from monasca_common.kafka.consumer import KafkaConsumer
|
from monasca_common.kafka.consumer import KafkaConsumer
|
||||||
from monasca_common.kafka.producer import KafkaProducer
|
from monasca_common.kafka.producer import KafkaProducer
|
||||||
@ -42,16 +43,29 @@ class NotificationEngine(object):
|
|||||||
self._alarms = AlarmProcessor(self._alarm_ttl, config)
|
self._alarms = AlarmProcessor(self._alarm_ttl, config)
|
||||||
self._notifier = NotificationProcessor(config['notification_types'])
|
self._notifier = NotificationProcessor(config['notification_types'])
|
||||||
|
|
||||||
|
self._config = config
|
||||||
|
|
||||||
|
def _add_periodic_notifications(self, notifications):
|
||||||
|
for notification in notifications:
|
||||||
|
topic = notification.periodic_topic
|
||||||
|
if topic in self._config['kafka']['periodic']:
|
||||||
|
notification.notification_timestamp = time.time()
|
||||||
|
self._producer.publish(self._config['kafka']['periodic'][topic],
|
||||||
|
[notification.to_json()])
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
finished_count = self._statsd.get_counter(name='alarms_finished_count')
|
finished_count = self._statsd.get_counter(name='alarms_finished_count')
|
||||||
for alarm in self._consumer:
|
for alarm in self._consumer:
|
||||||
log.debug('Received alarm >|%s|<', str(alarm))
|
log.debug('Received alarm >|%s|<', str(alarm))
|
||||||
notifications, partition, offset = self._alarms.to_notification(alarm)
|
notifications, partition, offset = self._alarms.to_notification(alarm)
|
||||||
if notifications:
|
if notifications:
|
||||||
|
self._add_periodic_notifications(notifications)
|
||||||
|
|
||||||
sent, failed = self._notifier.send(notifications)
|
sent, failed = self._notifier.send(notifications)
|
||||||
self._producer.publish(self._topics['notification_topic'],
|
self._producer.publish(self._topics['notification_topic'],
|
||||||
[i.to_json() for i in sent])
|
[i.to_json() for i in sent])
|
||||||
self._producer.publish(self._topics['retry_topic'],
|
self._producer.publish(self._topics['retry_topic'],
|
||||||
[i.to_json() for i in failed])
|
[i.to_json() for i in failed])
|
||||||
|
|
||||||
self._consumer.commit()
|
self._consumer.commit()
|
||||||
finished_count.increment()
|
finished_count.increment()
|
||||||
|
101
monasca_notification/periodic_engine.py
Normal file
101
monasca_notification/periodic_engine.py
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import monascastatsd
|
||||||
|
import time
|
||||||
|
|
||||||
|
from monasca_common.kafka.consumer import KafkaConsumer
|
||||||
|
from monasca_common.kafka.producer import KafkaProducer
|
||||||
|
from monasca_notification.common.repositories import exceptions
|
||||||
|
from monasca_notification.common.utils import get_db_repo
|
||||||
|
from notification import Notification
|
||||||
|
from processors.base import BaseProcessor
|
||||||
|
from processors.notification_processor import NotificationProcessor
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PeriodicEngine(object):
|
||||||
|
def __init__(self, config, interval):
|
||||||
|
self._topic_name = config['kafka']['periodic'][interval]
|
||||||
|
|
||||||
|
self._statsd = monascastatsd.Client(name='monasca',
|
||||||
|
dimensions=BaseProcessor.dimensions)
|
||||||
|
|
||||||
|
zookeeper_path = config['zookeeper']['periodic_path'][interval]
|
||||||
|
self._consumer = KafkaConsumer(config['kafka']['url'],
|
||||||
|
config['zookeeper']['url'],
|
||||||
|
zookeeper_path,
|
||||||
|
config['kafka']['group'],
|
||||||
|
self._topic_name)
|
||||||
|
|
||||||
|
self._producer = KafkaProducer(config['kafka']['url'])
|
||||||
|
|
||||||
|
self._notifier = NotificationProcessor(config['notification_types'])
|
||||||
|
self._db_repo = get_db_repo(config)
|
||||||
|
|
||||||
|
def _keep_sending(self, alarm_id, original_state):
|
||||||
|
# Go to DB and check alarm state
|
||||||
|
try:
|
||||||
|
current_state = self._db_repo.get_alarm_current_state(alarm_id)
|
||||||
|
except exceptions.DatabaseException:
|
||||||
|
log.debug('Database Error. Attempting reconnect')
|
||||||
|
current_state = self._db_repo.get_alarm_current_state(alarm_id)
|
||||||
|
# Alarm was deleted
|
||||||
|
if current_state is None:
|
||||||
|
return False
|
||||||
|
# Alarm state changed
|
||||||
|
if current_state != original_state:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
for raw_notification in self._consumer:
|
||||||
|
partition = raw_notification[0]
|
||||||
|
offset = raw_notification[1].offset
|
||||||
|
message = raw_notification[1].message.value
|
||||||
|
|
||||||
|
notification_data = json.loads(message)
|
||||||
|
|
||||||
|
ntype = notification_data['type']
|
||||||
|
name = notification_data['name']
|
||||||
|
addr = notification_data['address']
|
||||||
|
period = notification_data['period']
|
||||||
|
|
||||||
|
notification = Notification(ntype,
|
||||||
|
partition,
|
||||||
|
offset,
|
||||||
|
name,
|
||||||
|
addr,
|
||||||
|
period,
|
||||||
|
notification_data['retry_count'],
|
||||||
|
notification_data['raw_alarm'])
|
||||||
|
|
||||||
|
if self._keep_sending(notification.alarm_id, notification.state):
|
||||||
|
wait_duration = period - (
|
||||||
|
time.time() - notification_data['notification_timestamp'])
|
||||||
|
|
||||||
|
if wait_duration > 0:
|
||||||
|
time.sleep(wait_duration)
|
||||||
|
|
||||||
|
notification.notification_timestamp = time.time()
|
||||||
|
|
||||||
|
self._notifier.send([notification])
|
||||||
|
self._producer.publish(self._topic_name,
|
||||||
|
[notification.to_json()])
|
||||||
|
|
||||||
|
self._consumer.commit()
|
@ -16,10 +16,10 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import monascastatsd
|
import monascastatsd
|
||||||
import simport
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from monasca_notification.common.repositories import exceptions as exc
|
from monasca_notification.common.repositories import exceptions as exc
|
||||||
|
from monasca_notification.common.utils import get_db_repo
|
||||||
from monasca_notification.notification import Notification
|
from monasca_notification.notification import Notification
|
||||||
from monasca_notification.notification_exceptions import AlarmFormatError
|
from monasca_notification.notification_exceptions import AlarmFormatError
|
||||||
from monasca_notification.processors.base import BaseProcessor
|
from monasca_notification.processors.base import BaseProcessor
|
||||||
@ -33,10 +33,7 @@ class AlarmProcessor(BaseProcessor):
|
|||||||
self._alarm_ttl = alarm_ttl
|
self._alarm_ttl = alarm_ttl
|
||||||
self._statsd = monascastatsd.Client(name='monasca',
|
self._statsd = monascastatsd.Client(name='monasca',
|
||||||
dimensions=BaseProcessor.dimensions)
|
dimensions=BaseProcessor.dimensions)
|
||||||
if 'database' in config and 'repo_driver' in config['database']:
|
self._db_repo = get_db_repo(config)
|
||||||
self._db_repo = simport.load(config['database']['repo_driver'])(config)
|
|
||||||
else:
|
|
||||||
self._db_repo = simport.load('monasca_notification.common.repositories.mysql.mysql_repo:MysqlRepo')(config)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _parse_alarm(alarm_data):
|
def _parse_alarm(alarm_data):
|
||||||
@ -92,6 +89,7 @@ class AlarmProcessor(BaseProcessor):
|
|||||||
offset,
|
offset,
|
||||||
alarms_action[1],
|
alarms_action[1],
|
||||||
alarms_action[2],
|
alarms_action[2],
|
||||||
|
alarms_action[3],
|
||||||
0,
|
0,
|
||||||
alarm) for alarms_action in alarms_actions]
|
alarm) for alarms_action in alarms_actions]
|
||||||
|
|
||||||
|
@ -60,12 +60,14 @@ class RetryEngine(object):
|
|||||||
ntype = notification_data['type']
|
ntype = notification_data['type']
|
||||||
name = notification_data['name']
|
name = notification_data['name']
|
||||||
addr = notification_data['address']
|
addr = notification_data['address']
|
||||||
|
period = notification_data['period']
|
||||||
|
|
||||||
notification = Notification(ntype,
|
notification = Notification(ntype,
|
||||||
partition,
|
partition,
|
||||||
offset,
|
offset,
|
||||||
name,
|
name,
|
||||||
addr,
|
addr,
|
||||||
|
period,
|
||||||
notification_data['retry_count'],
|
notification_data['retry_count'],
|
||||||
notification_data['raw_alarm'])
|
notification_data['raw_alarm'])
|
||||||
|
|
||||||
|
@ -4,6 +4,9 @@ kafka:
|
|||||||
alarm_topic: alarm-state-transitions
|
alarm_topic: alarm-state-transitions
|
||||||
notification_topic: alarm-notifications
|
notification_topic: alarm-notifications
|
||||||
notification_retry_topic: retry-notifications
|
notification_retry_topic: retry-notifications
|
||||||
|
periodic:
|
||||||
|
60: 60-seconds-notifications
|
||||||
|
|
||||||
max_offset_lag: 600 # In seconds, undefined for none
|
max_offset_lag: 600 # In seconds, undefined for none
|
||||||
|
|
||||||
database:
|
database:
|
||||||
@ -66,6 +69,8 @@ zookeeper:
|
|||||||
url: 192.168.10.4:2181 # or comma seperated list of multiple hosts
|
url: 192.168.10.4:2181 # or comma seperated list of multiple hosts
|
||||||
notification_path: /notification/alarms
|
notification_path: /notification/alarms
|
||||||
notification_retry_path: /notification/retry
|
notification_retry_path: /notification/retry
|
||||||
|
periodic_path:
|
||||||
|
60: /notification/60_seconds
|
||||||
|
|
||||||
logging: # Used in logging.dictConfig
|
logging: # Used in logging.dictConfig
|
||||||
version: 1
|
version: 1
|
||||||
|
@ -124,10 +124,10 @@ class TestAlarmProcessor(unittest.TestCase):
|
|||||||
"severity": "LOW", "link": "http://some-place.com", "lifecycleState": "OPEN"}
|
"severity": "LOW", "link": "http://some-place.com", "lifecycleState": "OPEN"}
|
||||||
alarm = self._create_raw_alarm(0, 4, alarm_dict)
|
alarm = self._create_raw_alarm(0, 4, alarm_dict)
|
||||||
|
|
||||||
sql_response = [['test notification', 'EMAIL', 'me@here.com']]
|
sql_response = [['test notification', 'EMAIL', 'me@here.com', 0]]
|
||||||
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
|
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
|
||||||
|
|
||||||
test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', 0, alarm_dict)
|
test_notification = Notification('email', 0, 4, 'test notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self.assertEqual(notifications, [test_notification])
|
self.assertEqual(notifications, [test_notification])
|
||||||
self.assertEqual(partition, 0)
|
self.assertEqual(partition, 0)
|
||||||
@ -141,11 +141,12 @@ class TestAlarmProcessor(unittest.TestCase):
|
|||||||
|
|
||||||
alarm = self._create_raw_alarm(0, 5, alarm_dict)
|
alarm = self._create_raw_alarm(0, 5, alarm_dict)
|
||||||
|
|
||||||
sql_response = [['test notification', 'EMAIL', 'me@here.com'], ['test notification2', 'EMAIL', 'me@here.com']]
|
sql_response = [['test notification', 'EMAIL', 'me@here.com', 0],
|
||||||
|
['test notification2', 'EMAIL', 'me@here.com', 0]]
|
||||||
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
|
notifications, partition, offset = self._run_alarm_processor(alarm, sql_response)
|
||||||
|
|
||||||
test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', 0, alarm_dict)
|
test_notification = Notification('email', 0, 5, 'test notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', 0, alarm_dict)
|
test_notification2 = Notification('email', 0, 5, 'test notification2', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self.assertEqual(notifications, [test_notification, test_notification2])
|
self.assertEqual(notifications, [test_notification, test_notification2])
|
||||||
self.assertEqual(partition, 0)
|
self.assertEqual(partition, 0)
|
||||||
|
@ -104,7 +104,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metric)
|
alarm_dict = alarm(metric)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
@ -222,7 +222,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
@ -268,7 +268,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
email_result = email.send_notification(notification)
|
email_result = email.send_notification(notification)
|
||||||
|
|
||||||
@ -311,7 +311,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
@ -351,7 +351,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
@ -391,7 +391,7 @@ class TestEmail(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metrics)
|
alarm_dict = alarm(metrics)
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.append(email.send_notification(notification))
|
self.trap.append(email.send_notification(notification))
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ def test_json():
|
|||||||
'metrics': 'cpu_util'}
|
'metrics': 'cpu_util'}
|
||||||
test_notification = notification.Notification('ntype', 'src_partition',
|
test_notification = notification.Notification('ntype', 'src_partition',
|
||||||
'src_offset', 'name',
|
'src_offset', 'name',
|
||||||
'address', 0, alarm)
|
'address', 0, 0, alarm)
|
||||||
|
|
||||||
expected_dict = {u'name': u'name',
|
expected_dict = {u'name': u'name',
|
||||||
u'type': u'ntype',
|
u'type': u'ntype',
|
||||||
@ -50,6 +50,58 @@ def test_json():
|
|||||||
u'alarm_timestamp': ts / 1000,
|
u'alarm_timestamp': ts / 1000,
|
||||||
u'address': u'address',
|
u'address': u'address',
|
||||||
u'message': u'stateChangeReason',
|
u'message': u'stateChangeReason',
|
||||||
|
u'period': 0,
|
||||||
|
u'periodic_topic': 0,
|
||||||
|
u'retry_count': 0,
|
||||||
|
u'raw_alarm': {
|
||||||
|
u'alarmId': u'alarmId',
|
||||||
|
u'alarmName': u'alarmName',
|
||||||
|
u'timestamp': ts,
|
||||||
|
u'stateChangeReason': u'stateChangeReason',
|
||||||
|
u'newState': u'newState',
|
||||||
|
u'severity': u'LOW',
|
||||||
|
u'link': u'some-link',
|
||||||
|
u'lifecycleState': u'OPEN',
|
||||||
|
u'tenantId': u'tenantId',
|
||||||
|
u'metrics': u'cpu_util'}}
|
||||||
|
|
||||||
|
# Compare as dicts so ordering is not an issue
|
||||||
|
assert json.loads(test_notification.to_json()) == expected_dict
|
||||||
|
|
||||||
|
|
||||||
|
def test_json_non_zero_period():
|
||||||
|
"""Test the to_json method to verify it behaves as expected.
|
||||||
|
"""
|
||||||
|
ts = 1429029121239
|
||||||
|
alarm = {'alarmId': 'alarmId',
|
||||||
|
'alarmName': 'alarmName',
|
||||||
|
'timestamp': ts,
|
||||||
|
'stateChangeReason': 'stateChangeReason',
|
||||||
|
'newState': 'newState',
|
||||||
|
'severity': 'LOW',
|
||||||
|
"link": "some-link",
|
||||||
|
"lifecycleState": "OPEN",
|
||||||
|
'tenantId': 'tenantId',
|
||||||
|
'metrics': 'cpu_util'}
|
||||||
|
test_notification = notification.Notification('ntype', 'src_partition',
|
||||||
|
'src_offset', 'name',
|
||||||
|
'address', 60, 0, alarm)
|
||||||
|
|
||||||
|
expected_dict = {u'name': u'name',
|
||||||
|
u'type': u'ntype',
|
||||||
|
u'notification_timestamp': None,
|
||||||
|
u'tenant_id': u'tenantId',
|
||||||
|
u'alarm_name': u'alarmName',
|
||||||
|
u'alarm_id': u'alarmId',
|
||||||
|
u'state': u'newState',
|
||||||
|
u'severity': u'LOW',
|
||||||
|
u'link': u'some-link',
|
||||||
|
u'lifecycle_state': u'OPEN',
|
||||||
|
u'alarm_timestamp': ts / 1000,
|
||||||
|
u'address': u'address',
|
||||||
|
u'message': u'stateChangeReason',
|
||||||
|
u'period': 60,
|
||||||
|
u'periodic_topic': 60,
|
||||||
u'retry_count': 0,
|
u'retry_count': 0,
|
||||||
u'raw_alarm': {
|
u'raw_alarm': {
|
||||||
u'alarmId': u'alarmId',
|
u'alarmId': u'alarmId',
|
||||||
@ -80,10 +132,10 @@ def test_equal():
|
|||||||
'metrics': 'cpu_util'}
|
'metrics': 'cpu_util'}
|
||||||
test_notification = notification.Notification('ntype', 'src_partition',
|
test_notification = notification.Notification('ntype', 'src_partition',
|
||||||
'src_offset', 'name',
|
'src_offset', 'name',
|
||||||
'address', 0, alarm)
|
'address', 0, 0, alarm)
|
||||||
test_notification2 = notification.Notification('ntype', 'src_partition',
|
test_notification2 = notification.Notification('ntype', 'src_partition',
|
||||||
'src_offset', 'name',
|
'src_offset', 'name',
|
||||||
'address', 0, alarm)
|
'address', 0, 0, alarm)
|
||||||
|
|
||||||
assert(test_notification == test_notification2)
|
assert(test_notification == test_notification2)
|
||||||
|
|
||||||
@ -101,10 +153,10 @@ def test_unequal():
|
|||||||
'metrics': 'cpu_util'}
|
'metrics': 'cpu_util'}
|
||||||
test_notification = notification.Notification('ntype', 'src_partition',
|
test_notification = notification.Notification('ntype', 'src_partition',
|
||||||
'src_offset', 'name',
|
'src_offset', 'name',
|
||||||
'address', 0, alarm)
|
'address', 0, 0, alarm)
|
||||||
test_notification2 = notification.Notification('ntype', 'src_partition',
|
test_notification2 = notification.Notification('ntype', 'src_partition',
|
||||||
'src_offset', 'name',
|
'src_offset', 'name',
|
||||||
'address', 0, alarm)
|
'address', 0, 0, alarm)
|
||||||
test_notification2.alarm_id = None
|
test_notification2.alarm_id = None
|
||||||
|
|
||||||
assert(test_notification != test_notification2)
|
assert(test_notification != test_notification2)
|
||||||
|
@ -88,7 +88,7 @@ class TestNotificationProcessor(unittest.TestCase):
|
|||||||
"timestamp": time.time(),
|
"timestamp": time.time(),
|
||||||
"metrics": metric}
|
"metrics": metric}
|
||||||
|
|
||||||
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, alarm_dict)
|
notification = Notification('email', 0, 1, 'email notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self._start_processor([notification])
|
self._start_processor([notification])
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ class TestNotificationProcessor(unittest.TestCase):
|
|||||||
alarm_dict = {"tenantId": "0", "alarmId": "0", "alarmName": "test Alarm", "oldState": "OK", "newState": "ALARM",
|
alarm_dict = {"tenantId": "0", "alarmId": "0", "alarmName": "test Alarm", "oldState": "OK", "newState": "ALARM",
|
||||||
"stateChangeReason": "I am alarming!", "timestamp": time.time(), "metrics": "cpu_util",
|
"stateChangeReason": "I am alarming!", "timestamp": time.time(), "metrics": "cpu_util",
|
||||||
"severity": "LOW", "link": "http://some-place.com", "lifecycleState": "OPEN"}
|
"severity": "LOW", "link": "http://some-place.com", "lifecycleState": "OPEN"}
|
||||||
invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', 0, alarm_dict)
|
invalid_notification = Notification('invalid', 0, 1, 'test notification', 'me@here.com', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self._start_processor([invalid_notification])
|
self._start_processor([invalid_notification])
|
||||||
|
|
||||||
|
@ -227,7 +227,7 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'me@here.com', 0, alarm({})))
|
'me@here.com', 0, 0, alarm({})))
|
||||||
|
|
||||||
notifiers.send_notifications(notifications)
|
notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
@ -253,7 +253,7 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'me@here.com', 0, alarm({})))
|
'me@here.com', 0, 0, alarm({})))
|
||||||
|
|
||||||
sent, failed, invalid = notifiers.send_notifications(notifications)
|
sent, failed, invalid = notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
@ -282,7 +282,7 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('pagerduty', 0, 1,
|
notifications.append(Notification('pagerduty', 0, 1,
|
||||||
'pagerduty notification',
|
'pagerduty notification',
|
||||||
'me@here.com', 0, alarm({})))
|
'me@here.com', 0, 0, alarm({})))
|
||||||
|
|
||||||
sent, failed, invalid = notifiers.send_notifications(notifications)
|
sent, failed, invalid = notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
@ -315,13 +315,13 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'me@here.com', 0, alarm({})))
|
'me@here.com', 0, 0, alarm({})))
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'foo@here.com', 0, alarm({})))
|
'foo@here.com', 0, 0, alarm({})))
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'bar@here.com', 0, alarm({})))
|
'bar@here.com', 0, 0, alarm({})))
|
||||||
|
|
||||||
sent, failed, invalid = notifiers.send_notifications(notifications)
|
sent, failed, invalid = notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
@ -351,13 +351,13 @@ class TestInterface(unittest.TestCase):
|
|||||||
notifications = []
|
notifications = []
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'me@here.com', 0, alarm({})))
|
'me@here.com', 0, 0, alarm({})))
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'foo@here.com', 0, alarm({})))
|
'foo@here.com', 0, 0, alarm({})))
|
||||||
notifications.append(Notification('email', 0, 1,
|
notifications.append(Notification('email', 0, 1,
|
||||||
'email notification',
|
'email notification',
|
||||||
'bar@here.com', 0, alarm({})))
|
'bar@here.com', 0, 0, alarm({})))
|
||||||
|
|
||||||
notifiers.send_notifications(notifications)
|
notifiers.send_notifications(notifications)
|
||||||
|
|
||||||
|
@ -150,6 +150,7 @@ class TestWebhook(unittest.TestCase):
|
|||||||
'pagerduty notification',
|
'pagerduty notification',
|
||||||
'ABCDEF',
|
'ABCDEF',
|
||||||
0,
|
0,
|
||||||
|
0,
|
||||||
alarm_dict)
|
alarm_dict)
|
||||||
|
|
||||||
self.trap.put(pagerduty.send_notification(notification))
|
self.trap.put(pagerduty.send_notification(notification))
|
||||||
|
@ -86,7 +86,7 @@ class TestWebhook(unittest.TestCase):
|
|||||||
|
|
||||||
alarm_dict = alarm(metric)
|
alarm_dict = alarm(metric)
|
||||||
|
|
||||||
notification = Notification('webhook', 0, 1, 'webhook notification', 'http://mock:3333/', 0, alarm_dict)
|
notification = Notification('webhook', 0, 1, 'webhook notification', 'http://mock:3333/', 0, 0, alarm_dict)
|
||||||
|
|
||||||
self.trap.put(webhook.send_notification(notification))
|
self.trap.put(webhook.send_notification(notification))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user