Merge "Create a periodic worker helper class"

This commit is contained in:
Jenkins
2014-04-16 08:45:07 +00:00
committed by Gerrit Code Review

View File

@@ -26,10 +26,46 @@ from taskflow.engines.worker_based import proxy
from taskflow import exceptions as exc
from taskflow.utils import async_utils
from taskflow.utils import misc
from taskflow.utils import reflection
LOG = logging.getLogger(__name__)
def _is_alive(thread):
if not thread:
return False
return thread.is_alive()
class PeriodicWorker(object):
"""Calls a set of functions when activated periodically.
NOTE(harlowja): the provided timeout object determines the periodicity.
"""
def __init__(self, timeout, functors):
self._timeout = timeout
self._functors = []
for f in functors:
self._functors.append((f, reflection.get_callable_name(f)))
def start(self):
while not self._timeout.is_stopped():
for (f, f_name) in self._functors:
LOG.debug("Calling periodic function '%s'", f_name)
try:
f()
except Exception:
LOG.warn("Failed to call periodic function '%s'", f_name,
exc_info=True)
self._timeout.wait()
def stop(self):
self._timeout.interrupt()
def reset(self):
self._timeout.reset()
class WorkerTaskExecutor(executor.TaskExecutorBase):
"""Executes tasks on remote workers."""
@@ -41,8 +77,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
self._proxy = proxy.Proxy(uuid, exchange, self._on_message,
self._on_wait, **kwargs)
self._proxy_thread = None
self._notify_thread = None
self._notify_timeout = misc.Timeout(pr.NOTIFY_PERIOD)
self._periodic = PeriodicWorker(misc.Timeout(pr.NOTIFY_PERIOD),
[self._notify_topics])
self._periodic_thread = None
def _make_thread(self, target):
thread = threading.Thread(target=target)
@@ -134,7 +171,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
def _submit_task(self, task, task_uuid, action, arguments,
progress_callback, timeout=pr.REQUEST_TIMEOUT, **kwargs):
"""Submit task request to workers."""
"""Submit task request to a worker."""
request = pr.Request(task, task_uuid, action, arguments,
progress_callback, timeout, **kwargs)
@@ -168,11 +205,8 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
request.set_result(failure)
def _notify_topics(self):
"""Cyclically publish notify message to each topic."""
LOG.debug("Notify thread started.")
while not self._notify_timeout.is_stopped():
self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid)
self._notify_timeout.wait()
"""Cyclically called to publish notify message to each topic."""
self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid)
def execute_task(self, task, task_uuid, arguments,
progress_callback=None):
@@ -191,23 +225,24 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
def start(self):
"""Start proxy thread (and associated topic notification thread)."""
if self._proxy_thread is None:
if not _is_alive(self._proxy_thread):
self._proxy_thread = self._make_thread(self._proxy.start)
self._proxy_thread.start()
self._proxy.wait()
self._notify_timeout.reset()
self._notify_thread = self._make_thread(self._notify_topics)
self._notify_thread.start()
if not _is_alive(self._periodic_thread):
self._periodic.reset()
self._periodic_thread = self._make_thread(self._periodic.start)
self._periodic_thread.start()
def stop(self):
"""Stop proxy thread (and associated topic notification thread), so
those threads will be gracefully terminated.
"""
if self._periodic_thread is not None:
self._periodic.stop()
self._periodic_thread.join()
self._periodic_thread = None
if self._proxy_thread is not None:
if self._proxy_thread.is_alive():
self._notify_timeout.interrupt()
self._notify_thread.join()
self._proxy.stop()
self._proxy_thread.join()
self._notify_thread = None
self._proxy.stop()
self._proxy_thread.join()
self._proxy_thread = None