Merge "Use a notifier instead of a direct property assignment"
This commit is contained in:
@@ -62,7 +62,8 @@ class WorkerTaskExecutor(executor.TaskExecutor):
|
||||
# pre-existing knowledge of the topics those workers are on to gather
|
||||
# and update this information).
|
||||
self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics)
|
||||
self._finder.on_worker = self._on_worker
|
||||
self._finder.notifier.register(wt.WorkerFinder.WORKER_ARRIVED,
|
||||
self._on_worker)
|
||||
self._helpers = tu.ThreadBundle()
|
||||
self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start),
|
||||
after_start=lambda t: self._proxy.wait(),
|
||||
@@ -74,8 +75,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
|
||||
after_join=lambda t: p_worker.reset(),
|
||||
before_start=lambda t: p_worker.reset())
|
||||
|
||||
def _on_worker(self, worker):
|
||||
def _on_worker(self, event_type, details):
|
||||
"""Process new worker that has arrived (and fire off any work)."""
|
||||
worker = details['worker']
|
||||
for request in self._requests_cache.get_waiting_requests(worker):
|
||||
if request.transition_and_log_error(pr.PENDING, logger=LOG):
|
||||
self._publish_request(request, worker)
|
||||
|
||||
@@ -26,6 +26,7 @@ import six
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow import logging
|
||||
from taskflow.types import cache as base
|
||||
from taskflow.types import notifier
|
||||
from taskflow.types import periodic
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import kombu_utils as ku
|
||||
@@ -99,9 +100,12 @@ class TopicWorker(object):
|
||||
class WorkerFinder(object):
|
||||
"""Base class for worker finders..."""
|
||||
|
||||
#: Event type emitted when a new worker arrives.
|
||||
WORKER_ARRIVED = 'worker_arrived'
|
||||
|
||||
def __init__(self):
|
||||
self._cond = threading.Condition()
|
||||
self.on_worker = None
|
||||
self.notifier = notifier.RestrictedNotifier([self.WORKER_ARRIVED])
|
||||
|
||||
@abc.abstractmethod
|
||||
def _total_workers(self):
|
||||
@@ -214,8 +218,8 @@ class ProxyWorkerFinder(WorkerFinder):
|
||||
" total workers are currently known)", worker,
|
||||
self._total_workers())
|
||||
self._cond.notify_all()
|
||||
if self.on_worker is not None and new_or_updated:
|
||||
self.on_worker(worker)
|
||||
if new_or_updated:
|
||||
self.notifier.notify(self.WORKER_ARRIVED, {'worker': worker})
|
||||
|
||||
def clear(self):
|
||||
with self._cond:
|
||||
|
||||
Reference in New Issue
Block a user