165 lines
5.4 KiB
Python
Raw Normal View History

2014-02-27 16:55:07 -07:00
#!/usr/bin/env python
#
""" Notification Engine
This engine reads alarms from Kafka and then notifies the customer using their configured notification method.
"""
import logging
import yaml
from multiprocessing import Process, Queue
import os
import signal
import sys
import time
2014-02-27 16:55:07 -07:00
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from commit_tracker import KafkaCommitTracker
2014-02-27 16:55:07 -07:00
from kafka_consumer import KafkaConsumer
from processors.alarm import AlarmProcessor
from processors.notification import NotificationProcessor
from processors.sent_notification import SentNotificationProcessor
log = logging.getLogger(__name__)
processors = [] # global list to facilitate clean signal handling
def clean_exit(signum, frame=None):
""" Exit all processes cleanly
Can be called on an os signal or no zookeeper loosing connection.
2014-02-27 16:55:07 -07:00
"""
# todo - Figure out good exiting. For most situations, make sure it all shuts down nicely, finishing up anything in the queue
for process in processors:
process.terminate()
sys.exit(0)
def get_zookeeper_lock(url, topic):
""" Grab a lock in zookeeper or if not available retry in 30x
Add a listener to stop processes on
"""
lock_path = '/locks/mon-notification/%s' % topic
zookeeper = KazooClient(url)
zookeeper.start()
while True:
# The path is ephemeral so if it exists wait then cycle again
if zookeeper.exists(lock_path):
log.info('Another process has the lock for topic %s, waiting then retrying.' % topic)
time.sleep(15)
continue
try:
zookeeper.create(lock_path, ephemeral=True, makepath=True)
except KazooException, e:
# If creating the path fails something beat us to it most likely, try again
log.debug('Error creating lock path %s\n%s' % (lock_path, e))
continue
else:
# Succeeded in grabbing the lock continue
log.info('Grabbed lock for topic %s' % topic)
break
# Set up a listener to exit if we loose connection, this always exits even if the zookeeper connection is only
# suspended, the process should be supervised so it starts right back up again.
zookeeper.add_listener(clean_exit)
2014-02-27 16:55:07 -07:00
def main(argv=None):
if argv is None:
argv = sys.argv
if len(argv) == 2:
config_file = argv[1]
elif len(argv) > 2:
print "Usage: " + argv[0] + " <config_file>"
print "Config file defaults to /etc/mon/notification.yaml"
return 1
else:
config_file = '/etc/mon/notification.yaml'
config = yaml.load(open(config_file, 'r'))
# Setup logging
log_path = os.path.join(config['log_dir'], 'notification.log')
2014-03-05 17:16:18 -07:00
#todo restore normal logging
logging.basicConfig(level=logging.DEBUG)
# logging.basicConfig(format='%(asctime)s %(message)s', filename=log_path, level=logging.INFO)
2014-02-27 16:55:07 -07:00
# Todo review the code structure, is there a better design I could use?
2014-02-27 16:55:07 -07:00
#Create the queues
alarms = Queue(config['queues']['alarms_size'])
notifications = Queue(config['queues']['notifications_size'])
sent_notifications = Queue(config['queues']['sent_notifications_size'])
#Used for tracking the progress of fully processed alarms
tracker = KafkaCommitTracker(config['zookeeper']['url'], config['kafka']['alarm_topic'])
2014-02-27 16:55:07 -07:00
## Define processes
#start KafkaConsumer
kafka = Process(
target=KafkaConsumer(
alarms,
config['kafka']['url'],
config['kafka']['group'],
config['kafka']['alarm_topic'],
config['zookeeper']['url']
).run
)
2014-02-27 16:55:07 -07:00
processors.append(kafka)
#Define AlarmProcessors
alarm_processors = []
for i in xrange(config['processors']['alarm']['number']):
alarm_processors.append(Process(
target=AlarmProcessor(
alarms,
notifications,
tracker,
config['mysql']['host'],
config['mysql']['user'],
config['mysql']['passwd'],
config['mysql']['db']
).run)
)
processors.extend(alarm_processors)
#Define NotificationProcessors
notification_processors = []
for i in xrange(config['processors']['notification']['number']):
notification_processors.append(Process(target=NotificationProcessor(notifications, sent_notifications).run))
processors.extend(notification_processors)
2014-02-27 16:55:07 -07:00
#Define SentNotificationProcessor
sent_notification_processor = Process(
target=SentNotificationProcessor(
sent_notifications,
tracker,
config['kafka']['url'],
config['kafka']['notification_topic']
).run
)
2014-02-27 16:55:07 -07:00
processors.append(sent_notification_processor)
## Start
signal.signal(signal.SIGTERM, clean_exit)
get_zookeeper_lock(config['zookeeper']['url'], config['kafka']['alarm_topic'])
2014-02-27 16:55:07 -07:00
try:
log.info('Starting processes')
for process in processors:
process.start()
# Child processes shouldn't normally exit, so this should wait indefinitely
# without the join the debugger will end the parent thread.
for process in processors:
process.join()
2014-02-27 16:55:07 -07:00
except:
log.exception('Error exiting!')
for process in processors:
process.terminate()
if __name__ == "__main__":
sys.exit(main())