Notification Engine for Monasca
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

alarm_processor.py 5.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
  2. # Copyright 2017 FUJITSU LIMITED
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  13. # implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import time
  17. from oslo_config import cfg
  18. from oslo_log import log as logging
  19. import six
  20. import ujson as json
  21. from monasca_notification.common.repositories import exceptions as exc
  22. from monasca_notification.common.utils import get_db_repo
  23. from monasca_notification.common.utils import get_statsd_client
  24. from monasca_notification import notification
  25. from monasca_notification import notification_exceptions
  26. log = logging.getLogger(__name__)
  27. CONF = cfg.CONF
  28. class AlarmProcessor(object):
  29. def __init__(self):
  30. self._alarm_ttl = CONF.alarm_processor.ttl
  31. self._statsd = get_statsd_client()
  32. self._db_repo = get_db_repo()
  33. @staticmethod
  34. def _parse_alarm(alarm_data):
  35. """Parse the alarm message making sure it matches the expected format.
  36. """
  37. expected_fields = [
  38. 'actionsEnabled',
  39. 'alarmId',
  40. 'alarmDefinitionId',
  41. 'alarmName',
  42. 'newState',
  43. 'oldState',
  44. 'stateChangeReason',
  45. 'severity',
  46. 'link',
  47. 'lifecycleState',
  48. 'tenantId',
  49. 'timestamp'
  50. ]
  51. # check if alarm_data is <class 'bytes'>
  52. # if yes convert it to standard string
  53. if isinstance(alarm_data, six.binary_type):
  54. alarm_data = alarm_data.decode("utf-8")
  55. json_alarm = json.loads(alarm_data)
  56. alarm = json_alarm['alarm-transitioned']
  57. for field in expected_fields:
  58. if field not in alarm:
  59. raise notification_exceptions.AlarmFormatError(
  60. 'Alarm data missing field %s' % field)
  61. if ('tenantId' not in alarm) or ('alarmId' not in alarm):
  62. raise notification_exceptions.AlarmFormatError
  63. return alarm
  64. def _alarm_is_valid(self, alarm):
  65. """Check if the alarm is enabled and is within the ttl, return True in that case
  66. """
  67. if not alarm['actionsEnabled']:
  68. log.debug('Actions are disabled for this alarm.')
  69. return False
  70. alarm_age = time.time() - alarm['timestamp'] / 1000
  71. if (self._alarm_ttl is not None) and (alarm_age > self._alarm_ttl):
  72. log.warn('Received alarm older than the ttl, skipping. Alarm from %s' %
  73. time.ctime(alarm['timestamp'] / 1000))
  74. return False
  75. return True
  76. def _build_notification(self, alarm):
  77. db_time = self._statsd.get_timer()
  78. with db_time.time('config_db_time'):
  79. alarms_actions = self._db_repo.fetch_notifications(alarm)
  80. return [notification.Notification(
  81. alarms_action[0],
  82. alarms_action[1],
  83. alarms_action[2],
  84. alarms_action[3],
  85. alarms_action[4],
  86. 0,
  87. alarm) for alarms_action in alarms_actions]
  88. def to_notification(self, raw_alarm):
  89. """Check the notification setting for this project then create the appropriate notification
  90. """
  91. failed_parse_count = self._statsd.get_counter(name='alarms_failed_parse_count')
  92. no_notification_count = self._statsd.get_counter(name='alarms_no_notification_count')
  93. notification_count = self._statsd.get_counter(name='created_count')
  94. partition = raw_alarm[0]
  95. offset = raw_alarm[1].offset
  96. try:
  97. alarm = self._parse_alarm(raw_alarm[1].message.value)
  98. except Exception as e: # This is general because of a lack of json exception base class
  99. failed_parse_count += 1
  100. log.exception("Invalid Alarm format skipping partition %d, offset %d\nError%s" % (partition, offset, e))
  101. return [], partition, offset
  102. log.debug("Read alarm from alarms sent_queue. Partition %d, Offset %d, alarm data %s"
  103. % (partition, offset, alarm))
  104. if not self._alarm_is_valid(alarm):
  105. no_notification_count += 1
  106. return [], partition, offset
  107. try:
  108. notifications = self._build_notification(alarm)
  109. except exc.DatabaseException:
  110. log.debug('Database Error. Attempting reconnect')
  111. notifications = self._build_notification(alarm)
  112. if len(notifications) == 0:
  113. no_notification_count += 1
  114. log.debug('No notifications found for this alarm, partition %d, offset %d, alarm data %s'
  115. % (partition, offset, alarm))
  116. return [], partition, offset
  117. else:
  118. log.debug('Found %d notifications: [%s]', len(notifications), notifications)
  119. notification_count += len(notifications)
  120. return notifications, partition, offset