135 lines
4.2 KiB
Python
Raw Normal View History

2014-02-27 16:55:07 -07:00
#!/usr/bin/env python
#
""" Notification Engine
This engine reads alarms from Kafka and then notifies the customer using their configured notification method.
"""
import logging
2014-03-17 11:26:34 -06:00
import logging.config
2014-02-27 16:55:07 -07:00
from multiprocessing import Process, Queue
import signal
import sys
2014-03-12 11:31:57 -06:00
import yaml
2014-02-27 16:55:07 -07:00
from state_tracker import ZookeeperStateTracker
from processors.kafka_consumer import KafkaConsumer
from processors.alarm_processor import AlarmProcessor
from processors.notification_processor import NotificationProcessor
from processors.sent_notification_processor import SentNotificationProcessor
2014-02-27 16:55:07 -07:00
2014-03-12 11:31:57 -06:00
2014-02-27 16:55:07 -07:00
log = logging.getLogger(__name__)
processors = [] # global list to facilitate clean signal handling
def clean_exit(signum, frame=None):
""" Exit all processes cleanly
2014-03-13 16:16:26 -06:00
Can be called on an os signal or no zookeeper losing connection.
2014-02-27 16:55:07 -07:00
"""
for process in processors:
2014-03-17 15:05:17 -06:00
# Since this is set up as a handler for SIGCHLD when this kills one child it gets another signal, the result
# everything comes crashing down with some exceptions thrown for already dead processes
try:
process.terminate()
except:
pass
sys.exit(0)
2014-02-27 16:55:07 -07:00
def main(argv=None):
if argv is None:
argv = sys.argv
if len(argv) == 2:
config_file = argv[1]
elif len(argv) > 2:
print "Usage: " + argv[0] + " <config_file>"
print "Config file defaults to /etc/mon/notification.yaml"
return 1
else:
config_file = '/etc/mon/notification.yaml'
config = yaml.load(open(config_file, 'r'))
# Setup logging
2014-03-17 11:26:34 -06:00
logging.config.dictConfig(config['logging'])
2014-02-27 16:55:07 -07:00
#Create the queues
alarms = Queue(config['queues']['alarms_size'])
2014-03-12 11:31:57 -06:00
notifications = Queue(config['queues']['notifications_size']) # data is a list of notification objects
sent_notifications = Queue(config['queues']['sent_notifications_size']) # data is a list of notification objects
finished = Queue(config['queues']['finished_size']) # Data is of the form (partition, offset)
2014-02-27 16:55:07 -07:00
#State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock
tracker = ZookeeperStateTracker(config['zookeeper']['url'], config['kafka']['alarm_topic'], finished)
tracker.lock(clean_exit) # Only begin if we have the processing lock
## Define processors
#KafkaConsumer
kafka = Process(
target=KafkaConsumer(
alarms,
config['kafka']['url'],
config['kafka']['group'],
config['kafka']['alarm_topic'],
tracker.get_offsets()
).run
)
2014-02-27 16:55:07 -07:00
processors.append(kafka)
#AlarmProcessors
alarm_processors = []
for i in xrange(config['processors']['alarm']['number']):
alarm_processors.append(Process(
target=AlarmProcessor(
alarms,
notifications,
finished,
config['mysql']['host'],
config['mysql']['user'],
config['mysql']['passwd'],
config['mysql']['db']
).run)
)
processors.extend(alarm_processors)
#NotificationProcessors
notification_processors = []
for i in xrange(config['processors']['notification']['number']):
2014-03-12 11:31:57 -06:00
notification_processors.append(Process(
target=NotificationProcessor(
notifications,
sent_notifications,
finished,
config['email']
).run)
)
processors.extend(notification_processors)
#SentNotificationProcessor
sent_notification_processor = Process(
target=SentNotificationProcessor(
sent_notifications,
finished,
config['kafka']['url'],
config['kafka']['notification_topic']
).run
)
2014-02-27 16:55:07 -07:00
processors.append(sent_notification_processor)
## Start
signal.signal(signal.SIGTERM, clean_exit)
2014-03-17 15:05:17 -06:00
signal.signal(signal.SIGCHLD, clean_exit)
2014-02-27 16:55:07 -07:00
try:
log.info('Starting processes')
for process in processors:
process.start()
2014-03-13 16:16:26 -06:00
tracker.run() # Runs in the main process
2014-02-27 16:55:07 -07:00
except:
log.exception('Error exiting!')
for process in processors:
process.terminate()
if __name__ == "__main__":
sys.exit(main())