From 67b9e411536317384dc7fa6f83fefca2208f2be9 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 15 Mar 2016 09:52:03 -0700 Subject: [PATCH] Add periodic jobboard refreshing (incase of sync issues) Related-Bug: #1557107 Change-Id: I42672ef63ef02ec5ec6a842d263d0db83d91fe45 --- taskflow/conductors/backends/impl_executor.py | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py index 772cab2a..f8d624e9 100644 --- a/taskflow/conductors/backends/impl_executor.py +++ b/taskflow/conductors/backends/impl_executor.py @@ -24,6 +24,7 @@ except ImportError: from debtcollector import removals from oslo_utils import excutils +from oslo_utils import timeutils import six from taskflow.conductors import base @@ -63,6 +64,13 @@ class ExecutorConductor(base.Conductor): level logger will be used instead). """ + REFRESH_PERIODICITY = 30 + """ + Every 30 seconds the jobboard will be resynced (if for some reason + a watch or set of watches was not received) using the `ensure_fresh` + option to ensure this (for supporting jobboard backends only). + """ + #: Default timeout used to idle/wait when no jobs have been found. WAIT_TIMEOUT = 0.5 @@ -274,10 +282,20 @@ class ExecutorConductor(base.Conductor): # Don't even do any work in the first place... if max_dispatches == 0: raise StopIteration + fresh_period = timeutils.StopWatch( + duration=self.REFRESH_PERIODICITY) + fresh_period.start() while not is_stopped(): any_dispatched = False - for job in itertools.takewhile(self._can_claim_more_jobs, - self._jobboard.iterjobs()): + if fresh_period.expired(): + ensure_fresh = True + fresh_period.restart() + else: + ensure_fresh = False + job_it = itertools.takewhile( + self._can_claim_more_jobs, + self._jobboard.iterjobs(ensure_fresh=ensure_fresh)) + for job in job_it: self._log.debug("Trying to claim job: %s", job) try: self._jobboard.claim(job, self._name)