diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 75c28855..98ff3a8d 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -15,7 +15,6 @@ # under the License. import logging -import threading from kombu import exceptions as kombu_exc @@ -27,6 +26,7 @@ from taskflow import exceptions as exc from taskflow.utils import async_utils from taskflow.utils import misc from taskflow.utils import reflection +from taskflow.utils import threading_utils as tu LOG = logging.getLogger(__name__) @@ -81,14 +81,6 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): [self._notify_topics]) self._periodic_thread = None - def _make_thread(self, target): - thread = threading.Thread(target=target) - # NOTE(skudriashev): When the main thread is terminated unexpectedly - # and thread is still alive - it will prevent main thread from exiting - # unless the daemon property is set to True. - thread.daemon = True - return thread - def _on_message(self, data, message): """This method is called on incoming message.""" LOG.debug("Got message: %s", data) @@ -226,12 +218,12 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): def start(self): """Start proxy thread (and associated topic notification thread).""" if not _is_alive(self._proxy_thread): - self._proxy_thread = self._make_thread(self._proxy.start) + self._proxy_thread = tu.daemon_thread(self._proxy.start) self._proxy_thread.start() self._proxy.wait() if not _is_alive(self._periodic_thread): self._periodic.reset() - self._periodic_thread = self._make_thread(self._periodic.start) + self._periodic_thread = tu.daemon_thread(self._periodic.start) self._periodic_thread.start() def stop(self): diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py index 0e2d1d05..c669619a 100644 --- a/taskflow/utils/threading_utils.py +++ b/taskflow/utils/threading_utils.py @@ -15,6 +15,7 @@ # under the License. import multiprocessing +import threading import six @@ -34,3 +35,13 @@ def get_optimal_thread_count(): # just setup two threads since it's hard to know what else we # should do in this situation. return 2 + + +def daemon_thread(target, *args, **kwargs): + """Makes a daemon thread that calls the given target when started.""" + thread = threading.Thread(target=target, args=args, kwargs=kwargs) + # NOTE(skudriashev): When the main thread is terminated unexpectedly + # and thread is still alive - it will prevent main thread from exiting + # unless the daemon property is set to True. + thread.daemon = True + return thread