Neutron drivers for OpenDaylight.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
4.1KB

  1. #
  2. # Copyright (C) 2016 Red Hat, Inc.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. #
  16. from neutron_lib import context as neutron_context
  17. from neutron_lib.db import api as db_api
  18. from oslo_log import log as logging
  19. from oslo_service import loopingcall
  20. from networking_odl.db import db
  21. LOG = logging.getLogger(__name__)
  22. class PeriodicTask(object):
  23. def __init__(self, task, interval):
  24. self.task = task
  25. self.phases = []
  26. self.timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops)
  27. self.interval = interval
  28. def start(self):
  29. self.timer.start(self.interval, stop_on_exception=False)
  30. def cleanup(self):
  31. # this method is used for unit test to tear down
  32. self.timer.stop()
  33. try:
  34. self.timer.wait()
  35. except AttributeError:
  36. # NOTE(yamahata): workaround
  37. # some tests call this cleanup without calling start
  38. pass
  39. @db_api.retry_if_session_inactive()
  40. @db_api.CONTEXT_WRITER.savepoint
  41. def _set_operation(self, context, operation):
  42. db.update_periodic_task(context, task=self.task,
  43. operation=operation)
  44. def _execute_op(self, operation, context):
  45. op_details = operation.__name__
  46. if operation.__doc__:
  47. op_details += " (%s)" % operation.__doc__
  48. try:
  49. LOG.info("Starting %s phase of periodic task %s.",
  50. op_details, self.task)
  51. self._set_operation(context, operation)
  52. operation(context)
  53. LOG.info("Finished %s phase of %s task.", op_details, self.task)
  54. except Exception:
  55. LOG.exception("Failed during periodic task operation %s.",
  56. op_details)
  57. def task_already_executed_recently(self, context):
  58. return db.was_periodic_task_executed_recently(
  59. context, self.task, self.interval)
  60. @db_api.retry_if_session_inactive()
  61. @db_api.CONTEXT_WRITER.savepoint
  62. def _clear_and_unlock_task(self, context):
  63. db.update_periodic_task(context, task=self.task,
  64. operation=None)
  65. db.unlock_periodic_task(context, self.task)
  66. @db_api.retry_if_session_inactive()
  67. @db_api.CONTEXT_WRITER.savepoint
  68. def _lock_task(self, context):
  69. return db.lock_periodic_task(context, self.task)
  70. def execute_ops(self, forced=False):
  71. LOG.info("Starting %s periodic task.", self.task)
  72. context = neutron_context.get_admin_context()
  73. # Lock make sure that periodic task is executed only after
  74. # specified interval. It makes sure that maintenance tasks
  75. # are not executed back to back.
  76. if not forced and self.task_already_executed_recently(context):
  77. LOG.info("Periodic %s task executed after periodic interval "
  78. "Skipping execution.", self.task)
  79. return
  80. if not self._lock_task(context):
  81. LOG.info("Periodic %s task already running task", self.task)
  82. return
  83. try:
  84. for phase in self.phases:
  85. self._execute_op(phase, context)
  86. finally:
  87. self._clear_and_unlock_task(context)
  88. LOG.info("%s task has been finished", self.task)
  89. def register_operation(self, phase):
  90. """Register a function to be run by the periodic task.
  91. :param phase: Function to call when the thread runs. The function will
  92. receive a DB session to use for DB operations.
  93. """
  94. self.phases.append(phase)
  95. LOG.info("%s phase has been registered in %s task", phase, self.task)