diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 77c54737..75c28855 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -26,10 +26,46 @@ from taskflow.engines.worker_based import proxy from taskflow import exceptions as exc from taskflow.utils import async_utils from taskflow.utils import misc +from taskflow.utils import reflection LOG = logging.getLogger(__name__) +def _is_alive(thread): + if not thread: + return False + return thread.is_alive() + + +class PeriodicWorker(object): + """Calls a set of functions when activated periodically. + + NOTE(harlowja): the provided timeout object determines the periodicity. + """ + def __init__(self, timeout, functors): + self._timeout = timeout + self._functors = [] + for f in functors: + self._functors.append((f, reflection.get_callable_name(f))) + + def start(self): + while not self._timeout.is_stopped(): + for (f, f_name) in self._functors: + LOG.debug("Calling periodic function '%s'", f_name) + try: + f() + except Exception: + LOG.warn("Failed to call periodic function '%s'", f_name, + exc_info=True) + self._timeout.wait() + + def stop(self): + self._timeout.interrupt() + + def reset(self): + self._timeout.reset() + + class WorkerTaskExecutor(executor.TaskExecutorBase): """Executes tasks on remote workers.""" @@ -41,8 +77,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): self._proxy = proxy.Proxy(uuid, exchange, self._on_message, self._on_wait, **kwargs) self._proxy_thread = None - self._notify_thread = None - self._notify_timeout = misc.Timeout(pr.NOTIFY_PERIOD) + self._periodic = PeriodicWorker(misc.Timeout(pr.NOTIFY_PERIOD), + [self._notify_topics]) + self._periodic_thread = None def _make_thread(self, target): thread = threading.Thread(target=target) @@ -134,7 +171,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): def _submit_task(self, task, task_uuid, action, arguments, progress_callback, timeout=pr.REQUEST_TIMEOUT, **kwargs): - """Submit task request to workers.""" + """Submit task request to a worker.""" request = pr.Request(task, task_uuid, action, arguments, progress_callback, timeout, **kwargs) @@ -168,11 +205,8 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): request.set_result(failure) def _notify_topics(self): - """Cyclically publish notify message to each topic.""" - LOG.debug("Notify thread started.") - while not self._notify_timeout.is_stopped(): - self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid) - self._notify_timeout.wait() + """Cyclically called to publish notify message to each topic.""" + self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid) def execute_task(self, task, task_uuid, arguments, progress_callback=None): @@ -191,23 +225,24 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): def start(self): """Start proxy thread (and associated topic notification thread).""" - if self._proxy_thread is None: + if not _is_alive(self._proxy_thread): self._proxy_thread = self._make_thread(self._proxy.start) self._proxy_thread.start() self._proxy.wait() - self._notify_timeout.reset() - self._notify_thread = self._make_thread(self._notify_topics) - self._notify_thread.start() + if not _is_alive(self._periodic_thread): + self._periodic.reset() + self._periodic_thread = self._make_thread(self._periodic.start) + self._periodic_thread.start() def stop(self): """Stop proxy thread (and associated topic notification thread), so those threads will be gracefully terminated. """ + if self._periodic_thread is not None: + self._periodic.stop() + self._periodic_thread.join() + self._periodic_thread = None if self._proxy_thread is not None: - if self._proxy_thread.is_alive(): - self._notify_timeout.interrupt() - self._notify_thread.join() - self._proxy.stop() - self._proxy_thread.join() - self._notify_thread = None + self._proxy.stop() + self._proxy_thread.join() self._proxy_thread = None