2014-02-27 16:55:07 -07:00
#!/usr/bin/env python
#
""" Notification Engine
This engine reads alarms from Kafka and then notifies the customer using their configured notification method .
"""
import logging
import yaml
from multiprocessing import Process , Queue
import os
import signal
import sys
from kafka_consumer import KafkaConsumer
from processors . alarm import AlarmProcessor
from processors . notification import NotificationProcessor
from processors . sent_notification import SentNotificationProcessor
log = logging . getLogger ( __name__ )
processors = [ ] # global list to facilitate clean signal handling
def clean_exit ( signum , frame ) :
""" Exit cleanly on defined signals
"""
# todo - Figure out good exiting. For most situations, make sure it all shuts down nicely, finishing up anything in the queue
for process in processors :
process . terminate ( )
def main ( argv = None ) :
if argv is None :
argv = sys . argv
if len ( argv ) == 2 :
config_file = argv [ 1 ]
elif len ( argv ) > 2 :
print " Usage: " + argv [ 0 ] + " <config_file> "
print " Config file defaults to /etc/mon/notification.yaml "
return 1
else :
config_file = ' /etc/mon/notification.yaml '
config = yaml . load ( open ( config_file , ' r ' ) )
# Setup logging
log_path = os . path . join ( config [ ' log_dir ' ] , ' notification.log ' )
2014-03-05 17:16:18 -07:00
#todo restore normal logging
logging . basicConfig ( level = logging . DEBUG )
# logging.basicConfig(format='%(asctime)s %(message)s', filename=log_path, level=logging.INFO)
2014-02-27 16:55:07 -07:00
#Create the queues
alarms = Queue ( config [ ' queues ' ] [ ' alarms_size ' ] )
notifications = Queue ( config [ ' queues ' ] [ ' notifications_size ' ] )
sent_notifications = Queue ( config [ ' queues ' ] [ ' sent_notifications_size ' ] )
## Define processes
#start KafkaConsumer
2014-03-05 17:16:18 -07:00
kafka = Process ( target = KafkaConsumer ( config [ ' kafka ' ] [ ' url ' ] , config [ ' kafka ' ] [ ' group ' ] , config [ ' kafka ' ] [ ' alarm_topic ' ] , alarms ) . run )
2014-02-27 16:55:07 -07:00
processors . append ( kafka )
2014-03-05 17:16:18 -07:00
# #Define AlarmProcessors
# alarm_processors = []
# for i in xrange(config['processors']['alarm']['number']):
# alarm_processors.append(Process(target=AlarmProcessor(config, alarms, notifications).run)) # todo don't pass the config object just the bits needed
# processors.extend(alarm_processors)
#
# #Define NotificationProcessors
# notification_processors = []
# for i in xrange(config['processors']['notification']['number']):
# notification_processors.append(Process(target=NotificationProcessor(config, notifications, sent_notifications).run)) # todo don't pass the config object just the bits needed
# processors.extend(notification_processors)
#
2014-02-27 16:55:07 -07:00
#Define SentNotificationProcessor
2014-03-05 17:16:18 -07:00
# todo temp setup with the wrong queue to just test kafka basics
sent_notification_processor = Process ( target = SentNotificationProcessor ( config [ ' kafka ' ] [ ' url ' ] , config [ ' kafka ' ] [ ' group ' ] , config [ ' kafka ' ] [ ' alarm_topic ' ] , config [ ' kafka ' ] [ ' notification_topic ' ] , alarms ) . run )
# sent_notification_processor = Process(
# target=SentNotificationProcessor(
# config['kafka']['url'],
# config['kafka']['group'],
# config['kafka']['alarm_topic'],
# config['kafka']['notification_topic'],
# sent_notifications
# ).run
# )
2014-02-27 16:55:07 -07:00
processors . append ( sent_notification_processor )
## Start
signal . signal ( signal . SIGTERM , clean_exit )
try :
log . info ( ' Starting processes ' )
for process in processors :
process . start ( )
except :
log . exception ( ' Error exiting! ' )
for process in processors :
process . terminate ( )
if __name__ == " __main__ " :
sys . exit ( main ( ) )