diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 4c4fc060..77c54737 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -190,16 +190,19 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): return async_utils.wait_for_any(fs, timeout) def start(self): - """Start proxy thread.""" + """Start proxy thread (and associated topic notification thread).""" if self._proxy_thread is None: self._proxy_thread = self._make_thread(self._proxy.start) self._proxy_thread.start() self._proxy.wait() + self._notify_timeout.reset() self._notify_thread = self._make_thread(self._notify_topics) self._notify_thread.start() def stop(self): - """Stop proxy, so its thread would be gracefully terminated.""" + """Stop proxy thread (and associated topic notification thread), so + those threads will be gracefully terminated. + """ if self._proxy_thread is not None: if self._proxy_thread.is_alive(): self._notify_timeout.interrupt()