Notification Engine for Monasca
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

retry_engine.py 3.6KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
  2. # Copyright 2017 Fujitsu LIMITED
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  13. # implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import json
  17. import time
  18. from oslo_config import cfg
  19. from oslo_log import log as logging
  20. from monasca_common.kafka import consumer
  21. from monasca_common.kafka import producer
  22. from monasca_notification.common.utils import construct_notification_object
  23. from monasca_notification.common.utils import get_db_repo
  24. from monasca_notification.common.utils import get_statsd_client
  25. from monasca_notification.processors import notification_processor
  26. log = logging.getLogger(__name__)
  27. CONF = cfg.CONF
  28. class RetryEngine(object):
  29. def __init__(self):
  30. self._statsd = get_statsd_client()
  31. self._consumer = consumer.KafkaConsumer(
  32. CONF.kafka.url,
  33. ','.join(CONF.zookeeper.url),
  34. CONF.zookeeper.notification_retry_path,
  35. CONF.kafka.group,
  36. CONF.kafka.notification_retry_topic
  37. )
  38. self._producer = producer.KafkaProducer(CONF.kafka.url)
  39. self._notifier = notification_processor.NotificationProcessor()
  40. self._db_repo = get_db_repo()
  41. def run(self):
  42. for raw_notification in self._consumer:
  43. message = raw_notification[1].message.value
  44. notification_data = json.loads(message)
  45. notification = construct_notification_object(self._db_repo, notification_data)
  46. if notification is None:
  47. self._consumer.commit()
  48. continue
  49. wait_duration = CONF.retry_engine.interval - (
  50. time.time() - notification_data['notification_timestamp'])
  51. if wait_duration > 0:
  52. time.sleep(wait_duration)
  53. sent, failed = self._notifier.send([notification])
  54. if sent:
  55. self._producer.publish(CONF.kafka.notification_topic,
  56. [notification.to_json()])
  57. if failed:
  58. notification.retry_count += 1
  59. notification.notification_timestamp = time.time()
  60. if notification.retry_count < CONF.retry_engine.max_attempts:
  61. log.error(u"retry failed for {} with name {} "
  62. u"at {}. "
  63. u"Saving for later retry.".format(notification.type,
  64. notification.name,
  65. notification.address))
  66. self._producer.publish(CONF.kafka.notification_retry_topic,
  67. [notification.to_json()])
  68. else:
  69. log.error(u"retry failed for {} with name {} "
  70. u"at {} after {} retries. "
  71. u"Giving up on retry."
  72. .format(notification.type,
  73. notification.name,
  74. notification.address,
  75. CONF.retry_engine.max_attempts))
  76. self._consumer.commit()