Notification Engine for Monasca
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.py 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # (C) Copyright 2014-2017 Hewlett Packard Enterprise Development LP
  2. # Copyright 2017 FUJITSU LIMITED
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  13. # implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. """ Notification Engine
  17. This engine reads alarms from Kafka and then notifies the customer using their configured
  18. notification method.
  19. """
  20. import multiprocessing
  21. import os
  22. import signal
  23. import sys
  24. import time
  25. import warnings
  26. from oslo_log import log
  27. from monasca_notification import config
  28. from monasca_notification import notification_engine
  29. from monasca_notification import periodic_engine
  30. from monasca_notification import retry_engine
  31. LOG = log.getLogger(__name__)
  32. CONF = config.CONF
  33. processors = [] # global list to facilitate clean signal handling
  34. exiting = False
  35. def clean_exit(signum, frame=None):
  36. """Exit all processes attempting to finish uncommitted active work before exit.
  37. Can be called on an os signal or no zookeeper losing connection.
  38. """
  39. global exiting
  40. if exiting:
  41. # Since this is set up as a handler for SIGCHLD when this kills one
  42. # child it gets another signal, the global exiting avoids this running
  43. # multiple times.
  44. LOG.debug('Exit in progress clean_exit received additional signal %s' % signum)
  45. return
  46. LOG.info('Received signal %s, beginning graceful shutdown.' % signum)
  47. exiting = True
  48. wait_for_exit = False
  49. for process in processors:
  50. try:
  51. if process.is_alive():
  52. # Sends sigterm which any processes after a notification is sent attempt to handle
  53. process.terminate()
  54. wait_for_exit = True
  55. except Exception: # nosec
  56. # There is really nothing to do if the kill fails, so just go on.
  57. # The # nosec keeps bandit from reporting this as a security issue
  58. pass
  59. # wait for a couple seconds to give the subprocesses a chance to shut down correctly.
  60. if wait_for_exit:
  61. time.sleep(2)
  62. # Kill everything, that didn't already die
  63. for child in multiprocessing.active_children():
  64. LOG.debug('Killing pid %s' % child.pid)
  65. try:
  66. os.kill(child.pid, signal.SIGKILL)
  67. except Exception: # nosec
  68. # There is really nothing to do if the kill fails, so just go on.
  69. # The # nosec keeps bandit from reporting this as a security issue
  70. pass
  71. if signum == signal.SIGTERM:
  72. sys.exit(0)
  73. sys.exit(signum)
  74. def start_process(process_type, *args):
  75. LOG.info("start process: {}".format(process_type))
  76. p = process_type(*args)
  77. p.run()
  78. def main(argv=None):
  79. warnings.simplefilter('always')
  80. config.parse_args(argv=argv)
  81. for proc in range(0, CONF.notification_processor.number):
  82. processors.append(multiprocessing.Process(
  83. target=start_process,
  84. args=(notification_engine.NotificationEngine,))
  85. )
  86. processors.append(multiprocessing.Process(
  87. target=start_process,
  88. args=(retry_engine.RetryEngine,))
  89. )
  90. if 60 in CONF.kafka.periodic:
  91. processors.append(multiprocessing.Process(
  92. target=start_process,
  93. args=(periodic_engine.PeriodicEngine, 60))
  94. )
  95. try:
  96. LOG.info('Starting processes')
  97. for process in processors:
  98. process.start()
  99. # The signal handlers must be added after the processes start otherwise
  100. # they run on all processes
  101. signal.signal(signal.SIGCHLD, clean_exit)
  102. signal.signal(signal.SIGINT, clean_exit)
  103. signal.signal(signal.SIGTERM, clean_exit)
  104. while True:
  105. time.sleep(10)
  106. except Exception:
  107. LOG.exception('Error! Exiting.')
  108. clean_exit(signal.SIGKILL)
  109. if __name__ == "__main__":
  110. sys.exit(main(sys.argv[1:]))