Notification Engine for Monasca
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

retry_engine.py 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
  2. # Copyright 2017 Fujitsu LIMITED
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  13. # implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import json
  17. import time
  18. from monasca_common.kafka import consumer
  19. from monasca_common.kafka import producer
  20. from oslo_config import cfg
  21. from oslo_log import log as logging
  22. from six import PY3
  23. from monasca_notification.common.utils import construct_notification_object
  24. from monasca_notification.common.utils import get_db_repo
  25. from monasca_notification.common.utils import get_statsd_client
  26. from monasca_notification.processors import notification_processor
  27. log = logging.getLogger(__name__)
  28. CONF = cfg.CONF
  29. class RetryEngine(object):
  30. def __init__(self):
  31. self._statsd = get_statsd_client()
  32. self._consumer = consumer.KafkaConsumer(
  33. CONF.kafka.url,
  34. ','.join(CONF.zookeeper.url),
  35. CONF.zookeeper.notification_retry_path,
  36. CONF.kafka.group,
  37. CONF.kafka.notification_retry_topic
  38. )
  39. self._producer = producer.KafkaProducer(CONF.kafka.url)
  40. self._notifier = notification_processor.NotificationProcessor()
  41. self._db_repo = get_db_repo()
  42. def run(self):
  43. for raw_notification in self._consumer:
  44. message = raw_notification[1].message.value
  45. message = message.decode('UTF-8') if PY3 else message
  46. notification_data = json.loads(message)
  47. notification = construct_notification_object(self._db_repo, notification_data)
  48. if notification is None:
  49. self._consumer.commit()
  50. continue
  51. wait_duration = CONF.retry_engine.interval - (
  52. time.time() - notification_data['notification_timestamp'])
  53. if wait_duration > 0:
  54. time.sleep(wait_duration)
  55. sent, failed = self._notifier.send([notification])
  56. if sent:
  57. self._producer.publish(CONF.kafka.notification_topic,
  58. [notification.to_json()])
  59. if failed:
  60. notification.retry_count += 1
  61. notification.notification_timestamp = time.time()
  62. if notification.retry_count < CONF.retry_engine.max_attempts:
  63. log.error(u"retry failed for {} with name {} "
  64. u"at {}. "
  65. u"Saving for later retry.".format(notification.type,
  66. notification.name,
  67. notification.address))
  68. self._producer.publish(CONF.kafka.notification_retry_topic,
  69. [notification.to_json()])
  70. else:
  71. log.error(u"retry failed for {} with name {} "
  72. u"at {} after {} retries. "
  73. u"Giving up on retry."
  74. .format(notification.type,
  75. notification.name,
  76. notification.address,
  77. CONF.retry_engine.max_attempts))
  78. self._consumer.commit()