diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index a55229f1..7722432f 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -62,7 +62,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): # pre-existing knowledge of the topics those workers are on to gather # and update this information). self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics) - self._finder.on_worker = self._on_worker + self._finder.notifier.register(wt.WorkerFinder.WORKER_ARRIVED, + self._on_worker) self._helpers = tu.ThreadBundle() self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start), after_start=lambda t: self._proxy.wait(), @@ -74,8 +75,9 @@ class WorkerTaskExecutor(executor.TaskExecutor): after_join=lambda t: p_worker.reset(), before_start=lambda t: p_worker.reset()) - def _on_worker(self, worker): + def _on_worker(self, event_type, details): """Process new worker that has arrived (and fire off any work).""" + worker = details['worker'] for request in self._requests_cache.get_waiting_requests(worker): if request.transition_and_log_error(pr.PENDING, logger=LOG): self._publish_request(request, worker) diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 70185d52..3d53e87a 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -26,6 +26,7 @@ import six from taskflow.engines.worker_based import protocol as pr from taskflow import logging from taskflow.types import cache as base +from taskflow.types import notifier from taskflow.types import periodic from taskflow.types import timing as tt from taskflow.utils import kombu_utils as ku @@ -99,9 +100,12 @@ class TopicWorker(object): class WorkerFinder(object): """Base class for worker finders...""" + #: Event type emitted when a new worker arrives. + WORKER_ARRIVED = 'worker_arrived' + def __init__(self): self._cond = threading.Condition() - self.on_worker = None + self.notifier = notifier.RestrictedNotifier([self.WORKER_ARRIVED]) @abc.abstractmethod def _total_workers(self): @@ -214,8 +218,8 @@ class ProxyWorkerFinder(WorkerFinder): " total workers are currently known)", worker, self._total_workers()) self._cond.notify_all() - if self.on_worker is not None and new_or_updated: - self.on_worker(worker) + if new_or_updated: + self.notifier.notify(self.WORKER_ARRIVED, {'worker': worker}) def clear(self): with self._cond: