From e9262893f3146f565cbe3feba5d628526b56335e Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 20 Mar 2014 19:50:36 -0700 Subject: [PATCH] Add a timeout object that can be interrupted Add and use this object to make it so that the worker executor notifier can run and also be interrupted when its owning thread is finished (thus allowing it to be finished before the timeout is reached). Change-Id: Ice9f3b74b894dd7a76ab94ba4561b9405eba0206 --- taskflow/engines/worker_based/executor.py | 8 +++---- taskflow/utils/misc.py | 26 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index a8f2da85..d0e05e6c 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -42,7 +42,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): self._on_wait, **kwargs) self._proxy_thread = None self._notify_thread = None - self._notify_event = threading.Event() + self._notify_timeout = misc.Timeout(pr.NOTIFY_PERIOD) def _make_thread(self, target): thread = threading.Thread(target=target) @@ -167,9 +167,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): def _notify_topics(self): """Cyclically publish notify message to each topic.""" LOG.debug("Notify thread started.") - while not self._notify_event.is_set(): + while not self._notify_timeout.is_stopped(): self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid) - self._notify_event.wait(pr.NOTIFY_PERIOD) + self._notify_timeout.wait() def execute_task(self, task, task_uuid, arguments, progress_callback=None): @@ -199,7 +199,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): """Stop proxy, so its thread would be gracefully terminated.""" if self._proxy_thread is not None: if self._proxy_thread.is_alive(): - self._notify_event.set() + self._notify_timeout.interrupt() self._notify_thread.join() self._proxy.stop() self._proxy_thread.join() diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 7d6a5f66..c1e55ef2 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -25,6 +25,7 @@ import logging import os import string import sys +import threading import time import traceback @@ -210,6 +211,31 @@ class AttrDict(dict): self[name] = value +class Timeout(object): + """An object which represents a timeout. + + This object has the ability to be interrupted before the actual timeout + is reached. + """ + def __init__(self, timeout): + if timeout < 0: + raise ValueError("Timeout must be >= 0 and not %s" % (timeout)) + self._timeout = timeout + self._event = threading.Event() + + def interrupt(self): + self._event.set() + + def is_stopped(self): + return self._event.is_set() + + def wait(self): + self._event.wait(self._timeout) + + def reset(self): + self._event.clear() + + class ExponentialBackoff(object): """An iterable object that will yield back an exponential delay sequence provided an exponent and a number of items to yield. This object may be