134 lines
4.2 KiB
Python
Raw Normal View History

2014-02-27 16:55:07 -07:00
#!/usr/bin/env python
#
""" Notification Engine
This engine reads alarms from Kafka and then notifies the customer using their configured notification method.
"""
import logging
from multiprocessing import Process, Queue
import os
import signal
import sys
2014-03-12 11:31:57 -06:00
import yaml
2014-02-27 16:55:07 -07:00
from state_tracker import ZookeeperStateTracker
from processors.kafka_consumer import KafkaConsumer
from processors.alarm_processor import AlarmProcessor
from processors.notification_processor import NotificationProcessor
from processors.sent_notification_processor import SentNotificationProcessor
2014-02-27 16:55:07 -07:00
2014-03-12 11:31:57 -06:00
2014-02-27 16:55:07 -07:00
log = logging.getLogger(__name__)
processors = [] # global list to facilitate clean signal handling
def clean_exit(signum, frame=None):
""" Exit all processes cleanly
2014-03-13 16:16:26 -06:00
Can be called on an os signal or no zookeeper losing connection.
2014-02-27 16:55:07 -07:00
"""
for process in processors:
process.terminate()
sys.exit(0)
2014-02-27 16:55:07 -07:00
def main(argv=None):
if argv is None:
argv = sys.argv
if len(argv) == 2:
config_file = argv[1]
elif len(argv) > 2:
print "Usage: " + argv[0] + " <config_file>"
print "Config file defaults to /etc/mon/notification.yaml"
return 1
else:
config_file = '/etc/mon/notification.yaml'
config = yaml.load(open(config_file, 'r'))
# Setup logging
log_path = os.path.join(config['log_dir'], 'notification.log')
2014-03-13 10:55:19 -06:00
logging.basicConfig(format='%(asctime)s %(message)s', filename=log_path, level=logging.INFO)
kazoo_logger = logging.getLogger('kazoo')
kazoo_logger.setLevel(logging.WARN)
kafka_logger = logging.getLogger('kafka')
kafka_logger.setLevel(logging.WARN)
2014-02-27 16:55:07 -07:00
#Create the queues
alarms = Queue(config['queues']['alarms_size'])
2014-03-12 11:31:57 -06:00
notifications = Queue(config['queues']['notifications_size']) # data is a list of notification objects
sent_notifications = Queue(config['queues']['sent_notifications_size']) # data is a list of notification objects
finished = Queue(config['queues']['finished_size']) # Data is of the form (partition, offset)
2014-02-27 16:55:07 -07:00
#State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock
tracker = ZookeeperStateTracker(config['zookeeper']['url'], config['kafka']['alarm_topic'], finished)
tracker.lock(clean_exit) # Only begin if we have the processing lock
## Define processors
#KafkaConsumer
kafka = Process(
target=KafkaConsumer(
alarms,
config['kafka']['url'],
config['kafka']['group'],
config['kafka']['alarm_topic'],
tracker.get_offsets()
).run
)
2014-02-27 16:55:07 -07:00
processors.append(kafka)
#AlarmProcessors
alarm_processors = []
for i in xrange(config['processors']['alarm']['number']):
alarm_processors.append(Process(
target=AlarmProcessor(
alarms,
notifications,
finished,
config['mysql']['host'],
config['mysql']['user'],
config['mysql']['passwd'],
config['mysql']['db']
).run)
)
processors.extend(alarm_processors)
#NotificationProcessors
notification_processors = []
for i in xrange(config['processors']['notification']['number']):
2014-03-12 11:31:57 -06:00
notification_processors.append(Process(
target=NotificationProcessor(
notifications,
sent_notifications,
finished,
config['email']
).run)
)
processors.extend(notification_processors)
#SentNotificationProcessor
sent_notification_processor = Process(
target=SentNotificationProcessor(
sent_notifications,
finished,
config['kafka']['url'],
config['kafka']['notification_topic']
).run
)
2014-02-27 16:55:07 -07:00
processors.append(sent_notification_processor)
## Start
signal.signal(signal.SIGTERM, clean_exit)
try:
log.info('Starting processes')
for process in processors:
process.start()
2014-03-13 16:16:26 -06:00
tracker.run() # Runs in the main process
2014-02-27 16:55:07 -07:00
except:
log.exception('Error exiting!')
for process in processors:
process.terminate()
if __name__ == "__main__":
sys.exit(main())