From f2a6acaff06e557cda2639d6f644fb98b1ebe8a2 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 4 Feb 2015 17:16:38 -0800 Subject: [PATCH] Use a notifier instead of a direct property assignment To allow for finders to be shared (across executors) in the near future we need to avoid assigning our callback as the sole/single callback that the finder will use and instead use a notification mechanism (using our notifier type) that can be used to register many callbacks and decouple the emitting of new workers events from the reception of that event. Part of blueprint wbe-worker-info Change-Id: I0ab2ec649f759ec67e15960bdeb83b108174734b --- taskflow/engines/worker_based/executor.py | 6 ++++-- taskflow/engines/worker_based/types.py | 10 +++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index a55229f1..7722432f 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -62,7 +62,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): # pre-existing knowledge of the topics those workers are on to gather # and update this information). self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics) - self._finder.on_worker = self._on_worker + self._finder.notifier.register(wt.WorkerFinder.WORKER_ARRIVED, + self._on_worker) self._helpers = tu.ThreadBundle() self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start), after_start=lambda t: self._proxy.wait(), @@ -74,8 +75,9 @@ class WorkerTaskExecutor(executor.TaskExecutor): after_join=lambda t: p_worker.reset(), before_start=lambda t: p_worker.reset()) - def _on_worker(self, worker): + def _on_worker(self, event_type, details): """Process new worker that has arrived (and fire off any work).""" + worker = details['worker'] for request in self._requests_cache.get_waiting_requests(worker): if request.transition_and_log_error(pr.PENDING, logger=LOG): self._publish_request(request, worker) diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 70185d52..3d53e87a 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -26,6 +26,7 @@ import six from taskflow.engines.worker_based import protocol as pr from taskflow import logging from taskflow.types import cache as base +from taskflow.types import notifier from taskflow.types import periodic from taskflow.types import timing as tt from taskflow.utils import kombu_utils as ku @@ -99,9 +100,12 @@ class TopicWorker(object): class WorkerFinder(object): """Base class for worker finders...""" + #: Event type emitted when a new worker arrives. + WORKER_ARRIVED = 'worker_arrived' + def __init__(self): self._cond = threading.Condition() - self.on_worker = None + self.notifier = notifier.RestrictedNotifier([self.WORKER_ARRIVED]) @abc.abstractmethod def _total_workers(self): @@ -214,8 +218,8 @@ class ProxyWorkerFinder(WorkerFinder): " total workers are currently known)", worker, self._total_workers()) self._cond.notify_all() - if self.on_worker is not None and new_or_updated: - self.on_worker(worker) + if new_or_updated: + self.notifier.notify(self.WORKER_ARRIVED, {'worker': worker}) def clear(self): with self._cond: