monasca-notification/mon_notification/main.py

184 lines
6.1 KiB
Python

#!/usr/bin/env python
#
""" Notification Engine
This engine reads alarms from Kafka and then notifies the customer using their configured notification method.
"""
import logging
import logging.config
import multiprocessing
import os
import signal
from state_tracker import ZookeeperStateTracker
import sys
import threading
import time
import yaml
from processors.alarm_processor import AlarmProcessor
from processors.kafka_consumer import KafkaConsumer
from processors.notification_processor import NotificationProcessor
from processors.sent_notification_processor import SentNotificationProcessor
log = logging.getLogger(__name__)
processors = [] # global list to facilitate clean signal handling
exiting = False
def clean_exit(signum, frame=None):
"""Exit all processes attempting to finish uncommited active work before exit.
Can be called on an os signal or no zookeeper losing connection.
"""
global exiting
if exiting:
# Since this is set up as a handler for SIGCHLD when this kills one child it gets another signal, the global
# exiting avoids this running multiple times.
log.debug('Exit in progress clean_exit received additional signal %s' % signum)
return
log.info('Received signal %s, beginning graceful shutdown.' % signum)
exiting = True
# the final processor is the sent_notification processor, skip it and the tracker both of which should
# finish processing of any already sent notifications.
for process in processors[:-1]:
try:
if process.is_alive():
process.terminate() # Sends sigterm which any processes after a notification is sent attempt to handle
except Exception:
pass
tracker.stop = True
max_wait_count = 6
while tracker.has_lock:
if max_wait_count == 0:
log.debug('Max wait reached, proceeding to kill processes')
break
log.debug('Waiting for all active processing to stop.')
max_wait_count -= 1
time.sleep(20)
# Kill everything, that didn't already die
for child in multiprocessing.active_children():
log.debug('Killing pid %s' % child.pid)
try:
os.kill(child.pid, signal.SIGKILL)
except Exception:
pass
sys.exit(0)
def main(argv=None):
if argv is None:
argv = sys.argv
if len(argv) == 2:
config_file = argv[1]
elif len(argv) > 2:
print("Usage: " + argv[0] + " <config_file>")
print("Config file defaults to /etc/mon/notification.yaml")
return 1
else:
config_file = '/etc/mon/notification.yaml'
config = yaml.load(open(config_file, 'r'))
# Setup logging
logging.config.dictConfig(config['logging'])
#Create the queues
alarms = multiprocessing.Queue(config['queues']['alarms_size'])
notifications = multiprocessing.Queue(config['queues']['notifications_size']) # [notification_object, ]
sent_notifications = multiprocessing.Queue(config['queues']['sent_notifications_size']) # [notification_object, ]
finished = multiprocessing.Queue(config['queues']['finished_size']) # Data is of the form (partition, offset)
#State Tracker - Used for tracking the progress of fully processed alarms and the zookeeper lock
global tracker # Set to global for use in the cleanup function
tracker = ZookeeperStateTracker(
config['zookeeper']['url'], config['kafka']['alarm_topic'], finished, config['zookeeper']['max_offset_lag'])
tracker.lock(clean_exit) # Only begin if we have the processing lock
tracker_thread = threading.Thread(target=tracker.run)
## Define processors
#KafkaConsumer
kafka = multiprocessing.Process(
target=KafkaConsumer(
alarms,
config['kafka']['url'],
config['kafka']['group'],
config['kafka']['alarm_topic'],
tracker.offsets
).run
)
processors.append(kafka)
#AlarmProcessors
alarm_processors = []
for i in range(config['processors']['alarm']['number']):
alarm_processors.append(multiprocessing.Process(
target=AlarmProcessor(
alarms,
notifications,
finished,
config['processors']['alarm']['ttl'],
config['mysql']['host'],
config['mysql']['user'],
config['mysql']['passwd'],
config['mysql']['db']
).run),
)
processors.extend(alarm_processors)
#NotificationProcessors
notification_processors = []
for i in range(config['processors']['notification']['number']):
notification_processors.append(multiprocessing.Process(
target=NotificationProcessor(
notifications,
sent_notifications,
finished,
config['email']
).run),
)
processors.extend(notification_processors)
#SentNotificationProcessor
sent_notification_processor = multiprocessing.Process(
target=SentNotificationProcessor(
sent_notifications,
finished,
config['kafka']['url'],
config['kafka']['notification_topic']
).run
)
processors.append(sent_notification_processor)
## Start
try:
log.info('Starting processes')
for process in processors:
process.start()
# The offset tracker runs in a thread so the signal handler can run concurrently and cleanly shutdown
tracker_thread.start()
# The signal handlers must be added after the processes start otherwise they run on all processes
signal.signal(signal.SIGCHLD, clean_exit)
signal.signal(signal.SIGINT, clean_exit)
signal.signal(signal.SIGTERM, clean_exit)
# If the tracker fails exit
while True:
if tracker_thread.is_alive():
time.sleep(5)
else:
tracker.has_lock = False
clean_exit('tracker died', None)
except Exception:
log.exception('Error exiting!')
for process in processors:
process.terminate()
if __name__ == "__main__":
sys.exit(main())